Compare commits

...

32 Commits

Author SHA1 Message Date
jxxghp
f208b65570 更新 version.py 2024-11-29 08:59:55 +08:00
jxxghp
8a0a530036 Merge pull request #3279 from wikrin/v2 2024-11-29 07:36:34 +08:00
Attente
76643f13ed Update system.py 2024-11-29 07:33:02 +08:00
Attente
6992284a77 fix(api): 修复规则测试未获取到媒体信息导致的过滤失败问题 2024-11-29 07:25:08 +08:00
jxxghp
9a142799cd Merge pull request #3274 from InfinityPacer/feature/encoding 2024-11-28 17:29:22 +08:00
InfinityPacer
027d1567c3 feat(encoding): set PERFORMANCE_MODE to enabled by default 2024-11-28 17:07:14 +08:00
jxxghp
65af737dfd Merge pull request #3272 from wikrin/transfer 2024-11-28 07:23:25 +08:00
jxxghp
48aa0e3d0b Merge pull request #3271 from wikrin/v2 2024-11-28 07:22:16 +08:00
jxxghp
b4e31893ff Merge pull request #3268 from mackerel-12138/fix_scraper 2024-11-28 07:21:43 +08:00
Attente
4f1b95352a 改进手动整理逻辑 关联后端 jxxghp/MoviePilot-Frontend#255 2024-11-28 05:39:26 +08:00
Attente
ca664cb569 fix: 修复批量整理时媒体库目录匹配不正确的问题 2024-11-28 05:19:09 +08:00
zhanglijun
fe4ea73286 修复季nfo刮削错误, 优化季标题取值 2024-11-27 23:27:08 +08:00
jxxghp
9e9cca6de4 Merge pull request #3262 from InfinityPacer/feature/encoding 2024-11-27 16:25:46 +08:00
InfinityPacer
2e7e74c803 feat(encoding): update configuration to performance mode 2024-11-27 13:52:17 +08:00
InfinityPacer
916597047d Merge branch 'v2' of https://github.com/jxxghp/MoviePilot into feature/encoding 2024-11-27 12:52:01 +08:00
InfinityPacer
83fc474dbe feat(encoding): enhance encoding detection with confidence threshold 2024-11-27 12:33:57 +08:00
jxxghp
f67bf49e69 Merge pull request #3255 from InfinityPacer/feature/event 2024-11-27 06:59:54 +08:00
jxxghp
bf9043f526 Merge pull request #3254 from mackerel-12138/v2 2024-11-27 06:58:47 +08:00
InfinityPacer
a98de604a1 refactor(event): rename SmartRename to TransferRename 2024-11-27 00:50:34 +08:00
InfinityPacer
e160a745a7 fix(event): correct visualize_handlers 2024-11-27 00:49:37 +08:00
zhanglijun
7f2c6ef167 fix: 增加入参判断 2024-11-26 22:25:42 +08:00
jxxghp
2086651dbe Merge pull request #3235 from wikrin/fix 2024-11-26 22:17:32 +08:00
zhanglijun
132fde2308 修复季海报下载路径和第0季海报命名 2024-11-26 22:01:00 +08:00
jxxghp
4e27a1e623 fix #3247 2024-11-26 08:25:01 +08:00
Attente
a453831deb get_dir增加入参
- `include_unsorted`用于表示可否`包含`整理方式`为`不整理`的目录配置
2024-11-26 03:11:25 +08:00
jxxghp
1035ceb4ac Merge pull request #3245 from wikrin/v2 2024-11-25 23:04:15 +08:00
Attente
b7cb917347 fix(transfer): add library type and category folder support
- Add library_type_folder and library_category_folder parameters to the transfer function
- This enhances the transfer functionality by allowing sorting files into folders based on library type and category
2024-11-25 23:02:17 +08:00
jxxghp
680ad164dc Merge pull request #3236 from InfinityPacer/feature/scheduler 2024-11-25 17:54:05 +08:00
InfinityPacer
aed68253e9 feat(scheduler): expose internal methods for external invocation 2024-11-25 16:33:17 +08:00
InfinityPacer
b83c7a5656 feat(scheduler): support plugin method arguments via func_kwargs 2024-11-25 16:31:30 +08:00
InfinityPacer
491456b0a2 feat(scheduler): support plugin replacement for system services 2024-11-25 16:30:11 +08:00
Attente
84465a6536 不整理目录的下载路径可以被下载器获取
修改自动匹配源存储器类型入参
2024-11-25 13:51:04 +08:00
18 changed files with 282 additions and 141 deletions

View File

@@ -16,6 +16,7 @@ from app import schemas
from app.chain.search import SearchChain
from app.chain.system import SystemChain
from app.core.config import global_vars, settings
from app.core.metainfo import MetaInfo
from app.core.module import ModuleManager
from app.core.security import verify_apitoken, verify_resource_token, verify_token
from app.db.models import User
@@ -385,9 +386,12 @@ def ruletest(title: str,
if not rulegroup:
return schemas.Response(success=False, message=f"过滤规则组 {rulegroup_name} 不存在!")
# 根据标题查询媒体信息
media_info =SearchChain().recognize_media(MetaInfo(title=title, subtitle=subtitle))
# 过滤
result = SearchChain().filter_torrents(rule_groups=[rulegroup.name],
torrent_list=[torrent])
torrent_list=[torrent], mediainfo=media_info)
if not result:
return schemas.Response(success=False, message="不符合过滤规则!")
return schemas.Response(success=True, data={

View File

@@ -20,24 +20,42 @@ router = APIRouter()
class ManualTransferItem(BaseModel):
fileitem: FileItem = None,
logid: Optional[int] = None,
target_storage: Optional[str] = None,
target_path: Optional[str] = None,
tmdbid: Optional[int] = None,
doubanid: Optional[str] = None,
type_name: Optional[str] = None,
season: Optional[int] = None,
transfer_type: Optional[str] = None,
episode_format: Optional[str] = None,
episode_detail: Optional[str] = None,
episode_part: Optional[str] = None,
episode_offset: Optional[str] = None,
min_filesize: Optional[int] = 0,
scrape: bool = False,
library_type_folder: bool = False,
library_category_folder: bool = False,
from_history: bool = False
# 文件项
fileitem: FileItem = None
# 日志ID
logid: Optional[int] = None
# 目标存储
target_storage: Optional[str] = None
# 目标路径
target_path: Optional[str] = None
# TMDB ID
tmdbid: Optional[int] = None
# 豆瓣ID
doubanid: Optional[str] = None
# 类型
type_name: Optional[str] = None
# 季号
season: Optional[int] = None
# 整理方式
transfer_type: Optional[str] = None
# 自定义格式
episode_format: Optional[str] = None
# 指定集数
episode_detail: Optional[str] = None
# 指定PART
episode_part: Optional[str] = None
# 集数偏移
episode_offset: Optional[str] = None
# 最小文件大小
min_filesize: Optional[int] = 0
# 刮削
scrape: bool = False
# 媒体库类型子目录
library_type_folder: Optional[bool] = None
# 媒体库类别子目录
library_category_folder: Optional[bool] = None
# 复用历史识别信息
from_history: Optional[bool] = False
@router.get("/name", summary="查询整理后的名称", response_model=schemas.Response)

View File

@@ -256,7 +256,7 @@ class DownloadChain(ChainBase):
download_dir = Path(save_path)
else:
# 根据媒体信息查询下载目录配置
dir_info = self.directoryhelper.get_dir(_media, storage="local")
dir_info = self.directoryhelper.get_dir(_media, storage="local", include_unsorted=True)
# 拼装子目录
if dir_info:
# 一级目录

View File

@@ -426,7 +426,7 @@ class MediaChain(ChainBase, metaclass=Singleton):
# 电影目录
if is_bluray_folder(fileitem):
# 原盘目录
nfo_path = filepath / "movie.nfo"
nfo_path = filepath / (filepath.name + ".nfo")
if not overwrite and self.storagechain.get_file_item(storage=fileitem.storage, path=nfo_path):
logger.info(f"已存在nfo文件{nfo_path}")
return
@@ -547,9 +547,11 @@ class MediaChain(ChainBase, metaclass=Singleton):
continue
# 下载图片
content = __download_image(image_url)
# 保存图片文件到当前目录
# 保存图片文件到剧集目录
if content:
__save_file(_fileitem=fileitem, _path=image_path, _content=content)
if not parent:
parent = self.storagechain.get_parent_item(fileitem)
__save_file(_fileitem=parent, _path=image_path, _content=content)
# 判断当前目录是不是剧集根目录
if not season_meta.season:
# 是否已存在

View File

@@ -395,26 +395,26 @@ class TransferChain(ChainBase):
if not target_directory:
if src_match:
# 按源目录匹配,以便找到更合适的目录配置
target_directory = self.directoryhelper.get_dir(media=file_mediainfo,
dir_info = self.directoryhelper.get_dir(media=file_mediainfo,
storage=file_item.storage,
src_path=file_path,
target_storage=target_storage)
elif target_path:
# 指定目标路径,`手动整理`场景下使用,忽略源目录匹配,使用指定目录匹配
target_directory = self.directoryhelper.get_dir(media=file_mediainfo,
dir_info = self.directoryhelper.get_dir(media=file_mediainfo,
dest_path=target_path,
target_storage=target_storage)
else:
# 未指定目标路径,根据媒体信息获取目标目录
target_directory = self.directoryhelper.get_dir(file_mediainfo,
storage=target_storage,
dir_info = self.directoryhelper.get_dir(file_mediainfo,
storage=file_item.storage,
target_storage=target_storage)
# 执行整理
transferinfo: TransferInfo = self.transfer(fileitem=file_item,
meta=file_meta,
mediainfo=file_mediainfo,
target_directory=target_directory,
target_directory=target_directory or dir_info,
target_storage=target_storage,
target_path=target_path,
transfer_type=transfer_type,
@@ -693,8 +693,8 @@ class TransferChain(ChainBase):
epformat: EpisodeFormat = None,
min_filesize: int = 0,
scrape: bool = None,
library_type_folder: bool = False,
library_category_folder: bool = False,
library_type_folder: bool = None,
library_category_folder: bool = None,
force: bool = False) -> Tuple[bool, Union[str, list]]:
"""
手动整理,支持复杂条件,带进度显示
@@ -759,6 +759,8 @@ class TransferChain(ChainBase):
epformat=epformat,
min_filesize=min_filesize,
scrape=scrape,
library_type_folder=library_type_folder,
library_category_folder=library_category_folder,
force=force)
return state, errmsg

View File

@@ -219,6 +219,10 @@ class ConfigModel(BaseModel):
BIG_MEMORY_MODE: bool = False
# 全局图片缓存,将媒体图片缓存到本地
GLOBAL_IMAGE_CACHE: bool = False
# 是否启用编码探测的性能模式
ENCODING_DETECTION_PERFORMANCE_MODE: bool = True
# 编码探测的最低置信度阈值
ENCODING_DETECTION_MIN_CONFIDENCE: float = 0.8
# 允许的图片缓存域名
SECURITY_IMAGE_DOMAINS: List[str] = Field(
default_factory=lambda: ["image.tmdb.org",

View File

@@ -233,23 +233,29 @@ class EventManager(metaclass=Singleton):
可视化所有事件处理器,包括是否被禁用的状态
:return: 处理器列表,包含事件类型、处理器标识符、优先级(如果有)和状态
"""
def parse_handler_data(data):
"""
解析处理器数据,判断是否包含优先级
:param data: 订阅者数据,可能是元组或单一值
:return: (priority, handler),若没有优先级则返回 (None, handler)
"""
if isinstance(data, tuple) and len(data) == 2:
return data
return None, data
handler_info = []
# 统一处理广播事件和链式事件
for event_type, subscribers in {**self.__broadcast_subscribers, **self.__chain_subscribers}.items():
for handler_data in subscribers:
if isinstance(subscribers, dict):
priority, handler = handler_data
else:
priority = None
handler = handler_data
# 获取处理器的唯一标识符
handler_id = self.__get_handler_identifier(handler)
for handler_identifier, handler_data in subscribers.items():
# 解析优先级和处理器
priority, handler = parse_handler_data(handler_data)
# 检查处理器的启用状态
status = "enabled" if self.__is_handler_enabled(handler) else "disabled"
# 构建处理器信息字典
handler_dict = {
"event_type": event_type.value,
"handler_identifier": handler_id,
"handler_identifier": handler_identifier,
"status": status
}
if priority is not None:

View File

@@ -526,7 +526,8 @@ class PluginManager(metaclass=Singleton):
"name": "服务名称",
"trigger": "触发器cron、interval、date、CronTrigger.from_crontab()",
"func": self.xxx,
"kwargs": {} # 定时器参数
"kwargs": {} # 定时器参数,
"func_kwargs": {} # 方法参数
}]
"""
ret_services = []

View File

@@ -49,13 +49,14 @@ class DirectoryHelper:
"""
return [d for d in self.get_library_dirs() if d.library_storage == "local"]
def get_dir(self, media: MediaInfo,
def get_dir(self, media: MediaInfo, include_unsorted: bool = False,
storage: str = None, src_path: Path = None,
target_storage: str = None, dest_path: Path = None
) -> Optional[schemas.TransferDirectoryConf]:
"""
根据媒体信息获取下载目录、媒体库目录配置
:param media: 媒体信息
:param include_unsorted: 包含不整理目录
:param storage: 源存储类型
:param target_storage: 目标存储类型
:param fileitem: 文件项,使用文件路径匹配
@@ -73,7 +74,7 @@ class DirectoryHelper:
# 按照配置顺序查找
for d in dirs:
# 没有启用整理的目录
if not d.monitor_type:
if not d.monitor_type and not include_unsorted:
continue
# 源存储类型不匹配
if storage and d.storage != storage:

View File

@@ -17,7 +17,7 @@ from app.log import logger
from app.modules import _ModuleBase
from app.modules.filemanager.storages import StorageBase
from app.schemas import TransferInfo, ExistMediaInfo, TmdbEpisode, TransferDirectoryConf, FileItem, StorageUsage
from app.schemas.event import SmartRenameEventData
from app.schemas.event import TransferRenameEventData
from app.schemas.types import MediaType, ModuleType, ChainEventType
from app.utils.system import SystemUtils
@@ -1213,16 +1213,16 @@ class FileManagerModule(_ModuleBase):
logger.debug(f"Initial render string: {render_str}")
# 发送智能重命名事件
event_data = SmartRenameEventData(
event_data = TransferRenameEventData(
template_string=template_string,
rename_dict=rename_dict,
render_str=render_str,
path=path
)
event = eventmanager.send_event(ChainEventType.SmartRename, event_data)
event = eventmanager.send_event(ChainEventType.TransferRename, event_data)
# 检查事件返回的结果
if event and event.event_data:
event_data: SmartRenameEventData = event.event_data
event_data: TransferRenameEventData = event.event_data
if event_data.updated and event_data.updated_str:
logger.debug(f"Render string updated by event: "
f"{render_str} -> {event_data.updated_str} (source: {event_data.source})")

View File

@@ -344,11 +344,9 @@ class SiteParserBase(metaclass=ABCMeta):
logger.warn(
f"{self._site_name} 检测到Cloudflare请更新Cookie和UA")
return ""
if re.search(r"charset=\"?utf-8\"?", res.text, re.IGNORECASE):
res.encoding = "utf-8"
else:
res.encoding = res.apparent_encoding
return res.text
return RequestUtils.get_decoded_html_content(res,
settings.ENCODING_DETECTION_PERFORMANCE_MODE,
settings.ENCODING_DETECTION_MIN_CONFIDENCE)
return ""

View File

@@ -5,7 +5,6 @@ import traceback
from typing import List
from urllib.parse import quote, urlencode, urlparse, parse_qs
import chardet
from jinja2 import Template
from pyquery import PyQuery
from ruamel.yaml import CommentedMap
@@ -250,27 +249,9 @@ class TorrentSpider:
referer=self.referer,
proxies=self.proxies
).get_res(searchurl, allow_redirects=True)
if ret is not None:
# 使用chardet检测字符编码
raw_data = ret.content
if raw_data:
try:
result = chardet.detect(raw_data)
encoding = result['encoding']
# 解码为字符串
page_source = raw_data.decode(encoding)
except Exception as e:
logger.debug(f"chardet解码失败{str(e)}")
# 探测utf-8解码
if re.search(r"charset=\"?utf-8\"?", ret.text, re.IGNORECASE):
ret.encoding = "utf-8"
else:
ret.encoding = ret.apparent_encoding
page_source = ret.text
else:
page_source = ret.text
else:
page_source = ""
page_source = RequestUtils.get_decoded_html_content(ret,
settings.ENCODING_DETECTION_PERFORMANCE_MODE,
settings.ENCODING_DETECTION_MIN_CONFIDENCE)
# 解析
return self.parse(page_source)

View File

@@ -32,7 +32,7 @@ class TmdbScraper:
else:
if season is not None:
# 查询季信息
seasoninfo = self.tmdb.get_tv_season_detail(mediainfo.tmdb_id, meta.begin_season)
seasoninfo = self.tmdb.get_tv_season_detail(mediainfo.tmdb_id, season)
if episode:
# 集元数据文件
episodeinfo = self.__get_episode_detail(seasoninfo, meta.begin_episode)
@@ -102,7 +102,11 @@ class TmdbScraper:
ext = Path(seasoninfo.get('poster_path')).suffix
# URL
url = f"https://{settings.TMDB_IMAGE_DOMAIN}/t/p/original{seasoninfo.get('poster_path')}"
image_name = f"season{sea_seq}-poster{ext}"
# S0海报格式不同
if season == 0:
image_name = f"season-specials-poster{ext}"
else:
image_name = f"season{sea_seq}-poster{ext}"
return image_name, url
return "", ""
@@ -229,7 +233,7 @@ class TmdbScraper:
xoutline = DomUtils.add_node(doc, root, "outline")
xoutline.appendChild(doc.createCDATASection(seasoninfo.get("overview") or ""))
# 标题
DomUtils.add_node(doc, root, "title", "%s" % season)
DomUtils.add_node(doc, root, "title", seasoninfo.get("name") or "%s" % season)
# 发行日期
DomUtils.add_node(doc, root, "premiered", seasoninfo.get("air_date") or "")
DomUtils.add_node(doc, root, "releasedate", seasoninfo.get("air_date") or "")

View File

@@ -41,7 +41,7 @@ class Scheduler(metaclass=Singleton):
# 退出事件
_event = threading.Event()
# 锁
_lock = threading.Lock()
_lock = threading.RLock()
# 各服务的运行状态
_jobs = {}
# 用户认证失败次数
@@ -54,53 +54,6 @@ class Scheduler(metaclass=Singleton):
"""
初始化定时服务
"""
def clear_cache():
"""
清理缓存
"""
TorrentsChain().clear_cache()
SchedulerChain().clear_cache()
def user_auth():
"""
用户认证检查
"""
if SitesHelper().auth_level >= 2:
return
# 最大重试次数
__max_try__ = 30
if self._auth_count > __max_try__:
SchedulerChain().messagehelper.put(title=f"用户认证失败",
message="用户认证失败次数过多,将不再尝试认证!",
role="system")
return
logger.info("用户未认证,正在尝试认证...")
auth_conf = SystemConfigOper().get(SystemConfigKey.UserSiteAuthParams)
if auth_conf:
status, msg = SitesHelper().check_user(**auth_conf)
else:
status, msg = SitesHelper().check_user()
if status:
self._auth_count = 0
logger.info(f"{msg} 用户认证成功")
SchedulerChain().post_message(
Notification(
mtype=NotificationType.Manual,
title="MoviePilot用户认证成功",
text=f"使用站点:{msg}",
link=settings.MP_DOMAIN('#/site')
)
)
PluginManager().init_config()
self.init_plugin_jobs()
else:
self._auth_count += 1
logger.error(f"用户认证失败:{msg},共失败 {self._auth_count}")
if self._auth_count >= __max_try__:
logger.error("用户认证失败次数过多,将不再尝试认证!")
# 各服务的运行状态
self._jobs = {
"cookiecloud": {
@@ -146,12 +99,12 @@ class Scheduler(metaclass=Singleton):
},
"clear_cache": {
"name": "缓存清理",
"func": clear_cache,
"func": self.clear_cache,
"running": False,
},
"user_auth": {
"name": "用户认证检查",
"func": user_auth,
"func": self.user_auth,
"running": False,
},
"scheduler_job": {
@@ -434,11 +387,13 @@ class Scheduler(metaclass=Singleton):
try:
sid = f"{service['id']}"
job_id = sid.split("|")[0]
self.remove_plugin_job(pid, job_id)
self._jobs[job_id] = {
"func": service["func"],
"name": service["name"],
"pid": pid,
"plugin_name": plugin_name,
"kwargs": service.get("func_kwargs") or {},
"running": False,
}
self._scheduler.add_job(
@@ -446,7 +401,7 @@ class Scheduler(metaclass=Singleton):
service["trigger"],
id=sid,
name=service["name"],
**service["kwargs"],
**(service.get("kwargs") or {}),
kwargs={"job_id": job_id},
replace_existing=True
)
@@ -457,23 +412,34 @@ class Scheduler(metaclass=Singleton):
message=str(e),
role="system")
def remove_plugin_job(self, pid: str):
def remove_plugin_job(self, pid: str, job_id: str = None):
"""
移除插件定时服务
移除定时服务,可以是单个服务(包括默认服务)或整个插件的所有服务
:param pid: 插件 ID
:param job_id: 可选,指定要移除的单个服务的 job_id。如果不提供则移除该插件的所有服务当移除单个服务时默认服务也包含在内
"""
if not self._scheduler:
return
with self._lock:
# 获取插件名称
plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name")
# 先从 _jobs 中查找匹配的服务
jobs_to_remove = [(job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid]
if job_id:
# 移除单个服务
service = self._jobs.pop(job_id, None)
if not service:
return
jobs_to_remove = [(job_id, service)]
else:
# 移除插件的所有服务
jobs_to_remove = [
(job_id, service) for job_id, service in self._jobs.items() if service.get("pid") == pid
]
for job_id, _ in jobs_to_remove:
self._jobs.pop(job_id, None)
if not jobs_to_remove:
return
plugin_name = PluginManager().get_plugin_attr(pid, "plugin_name")
# 遍历移除任务
for job_id, service in jobs_to_remove:
try:
# 移除服务
self._jobs.pop(job_id, None)
# 在调度器中查找并移除对应的 job
job_removed = False
for job in list(self._scheduler.get_jobs()):
@@ -557,3 +523,50 @@ class Scheduler(metaclass=Singleton):
logger.info("定时任务停止完成")
except Exception as e:
logger.error(f"停止定时任务失败::{str(e)} - {traceback.format_exc()}")
@staticmethod
def clear_cache():
"""
清理缓存
"""
TorrentsChain().clear_cache()
SchedulerChain().clear_cache()
def user_auth(self):
"""
用户认证检查
"""
if SitesHelper().auth_level >= 2:
return
# 最大重试次数
__max_try__ = 30
if self._auth_count > __max_try__:
SchedulerChain().messagehelper.put(title=f"用户认证失败",
message="用户认证失败次数过多,将不再尝试认证!",
role="system")
return
logger.info("用户未认证,正在尝试认证...")
auth_conf = SystemConfigOper().get(SystemConfigKey.UserSiteAuthParams)
if auth_conf:
status, msg = SitesHelper().check_user(**auth_conf)
else:
status, msg = SitesHelper().check_user()
if status:
self._auth_count = 0
logger.info(f"{msg} 用户认证成功")
SchedulerChain().post_message(
Notification(
mtype=NotificationType.Manual,
title="MoviePilot用户认证成功",
text=f"使用站点:{msg}",
link=settings.MP_DOMAIN('#/site')
)
)
PluginManager().init_config()
self.init_plugin_jobs()
else:
self._auth_count += 1
logger.error(f"用户认证失败:{msg},共失败 {self._auth_count}")
if self._auth_count >= __max_try__:
logger.error("用户认证失败次数过多,将不再尝试认证!")

View File

@@ -117,9 +117,9 @@ class CommandRegisterEventData(ChainEventData):
source: str = Field("未知拦截源", description="拦截源")
class SmartRenameEventData(ChainEventData):
class TransferRenameEventData(ChainEventData):
"""
SmartRename 事件的数据模型
TransferRename 事件的数据模型
Attributes:
# 输入参数

View File

@@ -68,8 +68,8 @@ class ChainEventType(Enum):
AuthIntercept = "auth.intercept"
# 命令注册
CommandRegister = "command.register"
# 智能重命名
SmartRename = "SmartRename"
# 整理重命名
TransferRename = "transfer.rename"
# 系统配置Key字典

View File

@@ -1,5 +1,7 @@
import re
from typing import Any, Optional, Union
import chardet
import requests
import urllib3
from requests import Response, Session
@@ -273,3 +275,108 @@ class RequestUtils:
cache_headers["Cache-Control"] = f"max-age={max_age}"
return cache_headers
@staticmethod
def detect_encoding_from_html_response(response: Response,
performance_mode: bool = False, confidence_threshold: float = 0.8):
"""
根据HTML响应内容探测编码信息
:param response: HTTP 响应对象
:param performance_mode: 是否使用性能模式,默认为 False (兼容模式)
:param confidence_threshold: chardet 检测置信度阈值,默认为 0.8
:return: 解析得到的字符编码
"""
fallback_encoding = None
try:
if not performance_mode:
# 兼容模式使用chardet分析后再处理 BOM 和 meta 信息
# 1. 使用 chardet 库进一步分析内容
detection = chardet.detect(response.content)
if detection["confidence"] > confidence_threshold:
return detection.get("encoding")
# 保存 chardet 的结果备用
fallback_encoding = detection.get("encoding")
# 2. 检查响应体中的 BOM 标记(例如 UTF-8 BOM
if response.content[:3] == b"\xef\xbb\xbf": # UTF-8 BOM
return "utf-8"
# 3. 如果是 HTML 响应体,检查其中的 <meta charset="..."> 标签
if re.search(r"charset=[\"']?utf-8[\"']?", response.text, re.IGNORECASE):
return "utf-8"
# 4. 尝试从 response headers 中获取编码信息
content_type = response.headers.get("Content-Type", "")
if re.search(r"charset=[\"']?utf-8[\"']?", content_type, re.IGNORECASE):
return "utf-8"
else:
# 性能模式:优先从 headers 和 BOM 标记获取,最后使用 chardet 分析
# 1. 尝试从 response headers 中获取编码信息
content_type = response.headers.get("Content-Type", "")
if re.search(r"charset=[\"']?utf-8[\"']?", content_type, re.IGNORECASE):
return "utf-8"
# 暂不支持直接提取字符集仅提取UTF8
# match = re.search(r"charset=[\"']?([^\"';\s]+)", content_type, re.IGNORECASE)
# if match:
# return match.group(1)
# 2. 检查响应体中的 BOM 标记(例如 UTF-8 BOM
if response.content[:3] == b"\xef\xbb\xbf":
return "utf-8"
# 3. 如果是 HTML 响应体,检查其中的 <meta charset="..."> 标签
if re.search(r"charset=[\"']?utf-8[\"']?", response.text, re.IGNORECASE):
return "utf-8"
# 暂不支持直接提取字符集仅提取UTF8
# match = re.search(r"<meta[^>]+charset=[\"']?([^\"'>\s]+)", response.text, re.IGNORECASE)
# if match:
# return match.group(1)
# 4. 使用 chardet 库进一步分析内容
detection = chardet.detect(response.content)
if detection.get("confidence", 0) > confidence_threshold:
return detection.get("encoding")
# 保存 chardet 的结果备用
fallback_encoding = detection.get("encoding")
# 5. 如果上述方法都无法确定,信任 chardet 的结果(即使置信度较低),否则返回默认字符集
return fallback_encoding or "utf-8"
except Exception as e:
logger.debug(f"Error when detect_encoding_from_response: {str(e)}")
return fallback_encoding or "utf-8"
@staticmethod
def get_decoded_html_content(response: Response,
performance_mode: bool = False, confidence_threshold: float = 0.8) -> str:
"""
获取HTML响应的解码文本内容
:param response: HTTP 响应对象
:param performance_mode: 是否使用性能模式,默认为 False (兼容模式)
:param confidence_threshold: chardet 检测置信度阈值,默认为 0.8
:return: 解码后的响应文本内容
"""
try:
if not response:
return ""
if response.content:
# 1. 获取编码信息
encoding = (RequestUtils.detect_encoding_from_html_response(response, performance_mode,
confidence_threshold)
or response.apparent_encoding)
# 2. 根据解析得到的编码进行解码
try:
# 尝试用推测的编码解码
return response.content.decode(encoding)
except Exception as e:
logger.debug(f"Decoding failed, error message: {str(e)}")
# 如果解码失败,尝试 fallback 使用 apparent_encoding
response.encoding = response.apparent_encoding
return response.text
else:
return response.text
except Exception as e:
logger.debug(f"Error when getting decoded content: {str(e)}")
return response.text

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.1.0'
FRONTEND_VERSION = 'v2.1.0'
APP_VERSION = 'v2.1.1'
FRONTEND_VERSION = 'v2.1.1'