feat(encoding): enhance encoding detection with confidence threshold

This commit is contained in:
InfinityPacer
2024-11-27 12:33:57 +08:00
parent 2086651dbe
commit 83fc474dbe
4 changed files with 117 additions and 27 deletions

View File

@@ -1,5 +1,7 @@
import re
from typing import Any, Optional, Union
import chardet
import requests
import urllib3
from requests import Response, Session
@@ -273,3 +275,108 @@ class RequestUtils:
cache_headers["Cache-Control"] = f"max-age={max_age}"
return cache_headers
@staticmethod
def detect_encoding_from_html_response(response: Response,
compatible_mode: bool = False, confidence_threshold: float = 0.8):
"""
根据HTML响应内容探测编码信息
:param response: HTTP 响应对象
:param compatible_mode: 是否使用兼容模式,默认为 False (性能模式)
:param confidence_threshold: chardet 检测置信度阈值,默认为 0.8
:return: 解析得到的字符编码
"""
fallback_encoding = None
try:
if compatible_mode:
# 兼容模式使用chardet分析后再处理 BOM 和 meta 信息
# 1. 使用 chardet 库进一步分析内容
detection = chardet.detect(response.content)
if detection["confidence"] > confidence_threshold:
return detection.get("encoding")
# 保存 chardet 的结果备用
fallback_encoding = detection.get("encoding")
# 2. 检查响应体中的 BOM 标记(例如 UTF-8 BOM
if response.content[:3] == b"\xef\xbb\xbf": # UTF-8 BOM
return "utf-8"
# 3. 如果是 HTML 响应体,检查其中的 <meta charset="..."> 标签
if re.search(r"charset=[\"']?utf-8[\"']?", response.text, re.IGNORECASE):
return "utf-8"
# 4. 尝试从 response headers 中获取编码信息
content_type = response.headers.get("Content-Type", "")
if re.search(r"charset=[\"']?utf-8[\"']?", content_type, re.IGNORECASE):
return "utf-8"
else:
# 性能模式:优先从 headers 和 BOM 标记获取,最后使用 chardet 分析
# 1. 尝试从 response headers 中获取编码信息
content_type = response.headers.get("Content-Type", "")
if re.search(r"charset=[\"']?utf-8[\"']?", content_type, re.IGNORECASE):
return "utf-8"
# 暂不支持直接提取字符集仅提取UTF8
# match = re.search(r"charset=[\"']?([^\"';\s]+)", content_type, re.IGNORECASE)
# if match:
# return match.group(1)
# 2. 检查响应体中的 BOM 标记(例如 UTF-8 BOM
if response.content[:3] == b"\xef\xbb\xbf":
return "utf-8"
# 3. 如果是 HTML 响应体,检查其中的 <meta charset="..."> 标签
if re.search(r"charset=[\"']?utf-8[\"']?", response.text, re.IGNORECASE):
return "utf-8"
# 暂不支持直接提取字符集仅提取UTF8
# match = re.search(r"<meta[^>]+charset=[\"']?([^\"'>\s]+)", response.text, re.IGNORECASE)
# if match:
# return match.group(1)
# 4. 使用 chardet 库进一步分析内容
detection = chardet.detect(response.content)
if detection.get("confidence", 0) > confidence_threshold:
return detection.get("encoding")
# 保存 chardet 的结果备用
fallback_encoding = detection.get("encoding")
# 5. 如果上述方法都无法确定,信任 chardet 的结果(即使置信度较低),否则返回默认字符集
return fallback_encoding or "utf-8"
except Exception as e:
logger.debug(f"Error when detect_encoding_from_response: {str(e)}")
return fallback_encoding or "utf-8"
@staticmethod
def get_decoded_html_content(response: Response,
compatible_mode: bool = False, confidence_threshold: float = 0.8) -> str:
"""
获取HTML响应的解码文本内容
:param response: HTTP 响应对象
:param compatible_mode: 是否使用兼容模式,默认为 False (性能模式)
:param confidence_threshold: chardet 检测置信度阈值,默认为 0.8
:return: 解码后的响应文本内容
"""
try:
if not response:
return ""
if response.content:
# 1. 获取编码信息
encoding = (RequestUtils.detect_encoding_from_html_response(response, compatible_mode,
confidence_threshold)
or response.apparent_encoding)
# 2. 根据解析得到的编码进行解码
try:
# 尝试用推测的编码解码
return response.content.decode(encoding)
except Exception as e:
logger.debug(f"Decoding failed, error message: {str(e)}")
# 如果解码失败,尝试 fallback 使用 apparent_encoding
response.encoding = response.apparent_encoding
return response.text
else:
return response.text
except Exception as e:
logger.debug(f"Error when getting decoded content: {str(e)}")
return response.text