refactor: 优化配置解析逻辑,增强对泛型类型的支持

- 在 config.py 中引入 get_args 和 get_origin 函数,以更好地处理 List 和 Dict 类型的解析。
- 更新了对 List[str] 和 List[Dict[str, str]] 的解析逻辑,增加了错误处理和日志记录。
- 在 keys_status.js 中将 filterValidKeys 函数替换为 filterAndSearchValidKeys,保留旧函数以避免破坏潜在的遗留调用。
- 在 keys_status.html 中新增选项以支持更多项目选择。
This commit is contained in:
snaily
2025-07-08 16:35:56 +08:00
parent d234f826f4
commit b3da021803
3 changed files with 102 additions and 134 deletions

View File

@@ -4,7 +4,7 @@
import datetime
import json
from typing import Any, Dict, List, Type
from typing import Any, Dict, List, Type, get_args, get_origin
from pydantic import ValidationError, ValidationInfo, field_validator
from pydantic_settings import BaseSettings
@@ -141,86 +141,106 @@ def _parse_db_value(key: str, db_value: str, target_type: Type) -> Any:
logger = get_config_logger()
try:
# 处理 List[str]
if target_type == List[str]:
try:
parsed = json.loads(db_value)
if isinstance(parsed, list):
return [str(item) for item in parsed]
except json.JSONDecodeError:
origin_type = get_origin(target_type)
args = get_args(target_type)
# 处理 List 类型
if origin_type is list:
# 处理 List[str]
if args and args[0] == str:
try:
parsed = json.loads(db_value)
if isinstance(parsed, list):
return [str(item) for item in parsed]
except json.JSONDecodeError:
return [item.strip() for item in db_value.split(",") if item.strip()]
logger.warning(
f"Could not parse '{db_value}' as List[str] for key '{key}', falling back to comma split or empty list."
)
return [item.strip() for item in db_value.split(",") if item.strip()]
logger.warning(
f"Could not parse '{db_value}' as List[str] for key '{key}', falling back to comma split or empty list."
)
return [item.strip() for item in db_value.split(",") if item.strip()]
# 处理 Dict[str, float]
elif target_type == Dict[str, float]:
parsed_dict = {}
try:
parsed = json.loads(db_value)
if isinstance(parsed, dict):
parsed_dict = {str(k): float(v) for k, v in parsed.items()}
else:
logger.warning(
f"Parsed DB value for key '{key}' is not a dictionary type. Value: {db_value}"
)
except (json.JSONDecodeError, ValueError, TypeError) as e1:
if isinstance(e1, json.JSONDecodeError) and "'" in db_value:
logger.warning(
f"Failed initial JSON parse for key '{key}'. Attempting to replace single quotes. Error: {e1}"
)
try:
corrected_db_value = db_value.replace("'", '"')
parsed = json.loads(corrected_db_value)
if isinstance(parsed, dict):
parsed_dict = {str(k): float(v) for k, v in parsed.items()}
# 处理 List[Dict[str, str]]
elif args and get_origin(args[0]) is dict:
try:
parsed = json.loads(db_value)
if isinstance(parsed, list):
valid = all(
isinstance(item, dict)
and all(isinstance(k, str) for k in item.keys())
and all(isinstance(v, str) for v in item.values())
for item in parsed
)
if valid:
return parsed
else:
logger.warning(
f"Parsed DB value (after quote replacement) for key '{key}' is not a dictionary type. Value: {corrected_db_value}"
f"Invalid structure in List[Dict[str, str]] for key '{key}'. Value: {db_value}"
)
except (json.JSONDecodeError, ValueError, TypeError) as e2:
logger.error(
f"Could not parse '{db_value}' as Dict[str, float] for key '{key}' even after replacing quotes: {e2}. Returning empty dict."
)
else:
logger.error(
f"Could not parse '{db_value}' as Dict[str, float] for key '{key}': {e1}. Returning empty dict."
)
return parsed_dict
# 处理 List[Dict[str, str]]
elif target_type == List[Dict[str, str]]:
try:
parsed = json.loads(db_value)
if isinstance(parsed, list):
# 验证列表中的每个元素是否为字典,并且键和值都是字符串
valid = all(
isinstance(item, dict)
and all(isinstance(k, str) for k in item.keys())
and all(isinstance(v, str) for v in item.values())
for item in parsed
)
if valid:
return parsed
return []
else:
logger.warning(
f"Invalid structure in List[Dict[str, str]] for key '{key}'. Value: {db_value}"
f"Parsed DB value for key '{key}' is not a list type. Value: {db_value}"
)
return []
else:
logger.warning(
f"Parsed DB value for key '{key}' is not a list type. Value: {db_value}"
except json.JSONDecodeError:
logger.error(
f"Could not parse '{db_value}' as JSON for List[Dict[str, str]] for key '{key}'. Returning empty list."
)
return []
except json.JSONDecodeError:
logger.error(
f"Could not parse '{db_value}' as JSON for List[Dict[str, str]] for key '{key}'. Returning empty list."
)
return []
except Exception as e:
logger.error(
f"Error parsing List[Dict[str, str]] for key '{key}': {e}. Value: {db_value}. Returning empty list."
)
return []
except Exception as e:
logger.error(
f"Error parsing List[Dict[str, str]] for key '{key}': {e}. Value: {db_value}. Returning empty list."
)
return []
# 处理 Dict 类型
elif origin_type is dict:
# 处理 Dict[str, str]
if args and args == (str, str):
parsed_dict = {}
try:
parsed = json.loads(db_value)
if isinstance(parsed, dict):
parsed_dict = {str(k): str(v) for k, v in parsed.items()}
else:
logger.warning(
f"Parsed DB value for key '{key}' is not a dictionary type. Value: {db_value}"
)
except json.JSONDecodeError:
logger.error(f"Could not parse '{db_value}' as Dict[str, str] for key '{key}'. Returning empty dict.")
return parsed_dict
# 处理 Dict[str, float]
elif args and args == (str, float):
parsed_dict = {}
try:
parsed = json.loads(db_value)
if isinstance(parsed, dict):
parsed_dict = {str(k): float(v) for k, v in parsed.items()}
else:
logger.warning(
f"Parsed DB value for key '{key}' is not a dictionary type. Value: {db_value}"
)
except (json.JSONDecodeError, ValueError, TypeError) as e1:
if isinstance(e1, json.JSONDecodeError) and "'" in db_value:
logger.warning(
f"Failed initial JSON parse for key '{key}'. Attempting to replace single quotes. Error: {e1}"
)
try:
corrected_db_value = db_value.replace("'", '"')
parsed = json.loads(corrected_db_value)
if isinstance(parsed, dict):
parsed_dict = {str(k): float(v) for k, v in parsed.items()}
else:
logger.warning(
f"Parsed DB value (after quote replacement) for key '{key}' is not a dictionary type. Value: {corrected_db_value}"
)
except (json.JSONDecodeError, ValueError, TypeError) as e2:
logger.error(
f"Could not parse '{db_value}' as Dict[str, float] for key '{key}' even after replacing quotes: {e2}. Returning empty dict."
)
else:
logger.error(
f"Could not parse '{db_value}' as Dict[str, float] for key '{key}': {e1}. Returning empty dict."
)
return parsed_dict
# 处理 bool
elif target_type == bool:
return db_value.lower() in ("true", "1", "yes", "on")
@@ -309,18 +329,12 @@ async def sync_initial_settings():
if parsed_db_value != memory_value:
# 检查类型是否匹配,以防解析函数返回了不兼容的类型
type_match = False
if target_type == List[str] and isinstance(
parsed_db_value, list
):
type_match = True
elif target_type == Dict[str, float] and isinstance(
parsed_db_value, dict
):
type_match = True
elif target_type not in (
List[str],
Dict[str, float],
) and isinstance(parsed_db_value, target_type):
origin_type = get_origin(target_type)
if origin_type: # It's a generic type
if isinstance(parsed_db_value, origin_type):
type_match = True
# It's a non-generic type, or a specific generic we want to handle
elif isinstance(parsed_db_value, target_type):
type_match = True
if type_match:

View File

@@ -817,58 +817,11 @@ function toggleSection(header, sectionId) {
}
}
// 筛选有效密钥(根据失败次数阈值)并更新批量操作状态
// filterValidKeys 函数已被 filterAndSearchValidKeys 替代,此函数保留为空或可移除
function filterValidKeys() {
const thresholdInput = document.getElementById("failCountThreshold");
const validKeysList = document.getElementById("validKeys"); // Get the UL element
if (!validKeysList) return; // Exit if the list doesn't exist
const validKeyItems = validKeysList.querySelectorAll("li[data-key]"); // Select li elements within the list
// 读取阈值如果输入无效或为空则默认为0不过滤
const threshold = parseInt(thresholdInput.value, 10);
const filterThreshold = isNaN(threshold) || threshold < 0 ? 0 : threshold;
let hasVisibleItems = false;
validKeyItems.forEach((item) => {
// 确保只处理包含 data-fail-count 的 li 元素
if (item.dataset.failCount !== undefined) {
const failCount = parseInt(item.dataset.failCount, 10);
// 如果失败次数大于等于阈值,则显示,否则隐藏
if (failCount >= filterThreshold) {
item.style.display = "flex"; // 使用 flex 因为 li 现在是 flex 容器
hasVisibleItems = true;
} else {
item.style.display = "none"; // 隐藏
// 如果隐藏了一个项,取消其选中状态
const checkbox = item.querySelector(".key-checkbox");
if (checkbox && checkbox.checked) {
checkbox.checked = false;
}
}
}
});
// 更新有效密钥的批量操作状态和全选复选框
updateBatchActions("valid");
// 处理"暂无有效密钥"消息
const noMatchMsgId = "no-valid-keys-msg";
let noMatchMsg = validKeysList.querySelector(`#${noMatchMsgId}`);
const initialKeyCount = validKeysList.querySelectorAll("li[data-key]").length; // 获取初始密钥数量
if (!hasVisibleItems && initialKeyCount > 0) {
// 仅当初始有密钥但现在都不可见时显示
if (!noMatchMsg) {
noMatchMsg = document.createElement("li");
noMatchMsg.id = noMatchMsgId;
noMatchMsg.className = "text-center text-gray-500 py-4 col-span-full";
noMatchMsg.textContent = "没有符合条件的有效密钥";
validKeysList.appendChild(noMatchMsg);
}
noMatchMsg.style.display = "";
} else if (noMatchMsg) {
noMatchMsg.style.display = "none";
}
// This function is now handled by filterAndSearchValidKeys
// Kept for now to avoid breaking any potential legacy calls, but should be removed later.
filterAndSearchValidKeys();
}
// --- Initialization Helper Functions ---

View File

@@ -1245,6 +1245,7 @@ endblock %} {% block head_extra_styles %}
<option value="20">20</option>
<option value="50">50</option>
<option value="100">100</option>
<option value="500">500</option>
</select>
<span class="text-sm select-none font-semibold" style="color: #1f2937 !important;"></span>
</div>