diff --git a/app/config/config.py b/app/config/config.py index 5bc79d4..5d437f1 100644 --- a/app/config/config.py +++ b/app/config/config.py @@ -4,7 +4,7 @@ import datetime import json -from typing import Any, Dict, List, Type +from typing import Any, Dict, List, Type, get_args, get_origin from pydantic import ValidationError, ValidationInfo, field_validator from pydantic_settings import BaseSettings @@ -141,86 +141,106 @@ def _parse_db_value(key: str, db_value: str, target_type: Type) -> Any: logger = get_config_logger() try: - # 处理 List[str] - if target_type == List[str]: - try: - parsed = json.loads(db_value) - if isinstance(parsed, list): - return [str(item) for item in parsed] - except json.JSONDecodeError: + origin_type = get_origin(target_type) + args = get_args(target_type) + + # 处理 List 类型 + if origin_type is list: + # 处理 List[str] + if args and args[0] == str: + try: + parsed = json.loads(db_value) + if isinstance(parsed, list): + return [str(item) for item in parsed] + except json.JSONDecodeError: + return [item.strip() for item in db_value.split(",") if item.strip()] + logger.warning( + f"Could not parse '{db_value}' as List[str] for key '{key}', falling back to comma split or empty list." + ) return [item.strip() for item in db_value.split(",") if item.strip()] - logger.warning( - f"Could not parse '{db_value}' as List[str] for key '{key}', falling back to comma split or empty list." - ) - return [item.strip() for item in db_value.split(",") if item.strip()] - # 处理 Dict[str, float] - elif target_type == Dict[str, float]: - parsed_dict = {} - try: - parsed = json.loads(db_value) - if isinstance(parsed, dict): - parsed_dict = {str(k): float(v) for k, v in parsed.items()} - else: - logger.warning( - f"Parsed DB value for key '{key}' is not a dictionary type. Value: {db_value}" - ) - except (json.JSONDecodeError, ValueError, TypeError) as e1: - if isinstance(e1, json.JSONDecodeError) and "'" in db_value: - logger.warning( - f"Failed initial JSON parse for key '{key}'. Attempting to replace single quotes. Error: {e1}" - ) - try: - corrected_db_value = db_value.replace("'", '"') - parsed = json.loads(corrected_db_value) - if isinstance(parsed, dict): - parsed_dict = {str(k): float(v) for k, v in parsed.items()} + # 处理 List[Dict[str, str]] + elif args and get_origin(args[0]) is dict: + try: + parsed = json.loads(db_value) + if isinstance(parsed, list): + valid = all( + isinstance(item, dict) + and all(isinstance(k, str) for k in item.keys()) + and all(isinstance(v, str) for v in item.values()) + for item in parsed + ) + if valid: + return parsed else: logger.warning( - f"Parsed DB value (after quote replacement) for key '{key}' is not a dictionary type. Value: {corrected_db_value}" + f"Invalid structure in List[Dict[str, str]] for key '{key}'. Value: {db_value}" ) - except (json.JSONDecodeError, ValueError, TypeError) as e2: - logger.error( - f"Could not parse '{db_value}' as Dict[str, float] for key '{key}' even after replacing quotes: {e2}. Returning empty dict." - ) - else: - logger.error( - f"Could not parse '{db_value}' as Dict[str, float] for key '{key}': {e1}. Returning empty dict." - ) - return parsed_dict - # 处理 List[Dict[str, str]] - elif target_type == List[Dict[str, str]]: - try: - parsed = json.loads(db_value) - if isinstance(parsed, list): - # 验证列表中的每个元素是否为字典,并且键和值都是字符串 - valid = all( - isinstance(item, dict) - and all(isinstance(k, str) for k in item.keys()) - and all(isinstance(v, str) for v in item.values()) - for item in parsed - ) - if valid: - return parsed + return [] else: logger.warning( - f"Invalid structure in List[Dict[str, str]] for key '{key}'. Value: {db_value}" + f"Parsed DB value for key '{key}' is not a list type. Value: {db_value}" ) return [] - else: - logger.warning( - f"Parsed DB value for key '{key}' is not a list type. Value: {db_value}" + except json.JSONDecodeError: + logger.error( + f"Could not parse '{db_value}' as JSON for List[Dict[str, str]] for key '{key}'. Returning empty list." ) return [] - except json.JSONDecodeError: - logger.error( - f"Could not parse '{db_value}' as JSON for List[Dict[str, str]] for key '{key}'. Returning empty list." - ) - return [] - except Exception as e: - logger.error( - f"Error parsing List[Dict[str, str]] for key '{key}': {e}. Value: {db_value}. Returning empty list." - ) - return [] + except Exception as e: + logger.error( + f"Error parsing List[Dict[str, str]] for key '{key}': {e}. Value: {db_value}. Returning empty list." + ) + return [] + # 处理 Dict 类型 + elif origin_type is dict: + # 处理 Dict[str, str] + if args and args == (str, str): + parsed_dict = {} + try: + parsed = json.loads(db_value) + if isinstance(parsed, dict): + parsed_dict = {str(k): str(v) for k, v in parsed.items()} + else: + logger.warning( + f"Parsed DB value for key '{key}' is not a dictionary type. Value: {db_value}" + ) + except json.JSONDecodeError: + logger.error(f"Could not parse '{db_value}' as Dict[str, str] for key '{key}'. Returning empty dict.") + return parsed_dict + # 处理 Dict[str, float] + elif args and args == (str, float): + parsed_dict = {} + try: + parsed = json.loads(db_value) + if isinstance(parsed, dict): + parsed_dict = {str(k): float(v) for k, v in parsed.items()} + else: + logger.warning( + f"Parsed DB value for key '{key}' is not a dictionary type. Value: {db_value}" + ) + except (json.JSONDecodeError, ValueError, TypeError) as e1: + if isinstance(e1, json.JSONDecodeError) and "'" in db_value: + logger.warning( + f"Failed initial JSON parse for key '{key}'. Attempting to replace single quotes. Error: {e1}" + ) + try: + corrected_db_value = db_value.replace("'", '"') + parsed = json.loads(corrected_db_value) + if isinstance(parsed, dict): + parsed_dict = {str(k): float(v) for k, v in parsed.items()} + else: + logger.warning( + f"Parsed DB value (after quote replacement) for key '{key}' is not a dictionary type. Value: {corrected_db_value}" + ) + except (json.JSONDecodeError, ValueError, TypeError) as e2: + logger.error( + f"Could not parse '{db_value}' as Dict[str, float] for key '{key}' even after replacing quotes: {e2}. Returning empty dict." + ) + else: + logger.error( + f"Could not parse '{db_value}' as Dict[str, float] for key '{key}': {e1}. Returning empty dict." + ) + return parsed_dict # 处理 bool elif target_type == bool: return db_value.lower() in ("true", "1", "yes", "on") @@ -309,18 +329,12 @@ async def sync_initial_settings(): if parsed_db_value != memory_value: # 检查类型是否匹配,以防解析函数返回了不兼容的类型 type_match = False - if target_type == List[str] and isinstance( - parsed_db_value, list - ): - type_match = True - elif target_type == Dict[str, float] and isinstance( - parsed_db_value, dict - ): - type_match = True - elif target_type not in ( - List[str], - Dict[str, float], - ) and isinstance(parsed_db_value, target_type): + origin_type = get_origin(target_type) + if origin_type: # It's a generic type + if isinstance(parsed_db_value, origin_type): + type_match = True + # It's a non-generic type, or a specific generic we want to handle + elif isinstance(parsed_db_value, target_type): type_match = True if type_match: diff --git a/app/static/js/keys_status.js b/app/static/js/keys_status.js index 42fc54e..bdb3d20 100644 --- a/app/static/js/keys_status.js +++ b/app/static/js/keys_status.js @@ -817,58 +817,11 @@ function toggleSection(header, sectionId) { } } -// 筛选有效密钥(根据失败次数阈值)并更新批量操作状态 +// filterValidKeys 函数已被 filterAndSearchValidKeys 替代,此函数保留为空或可移除 function filterValidKeys() { - const thresholdInput = document.getElementById("failCountThreshold"); - const validKeysList = document.getElementById("validKeys"); // Get the UL element - if (!validKeysList) return; // Exit if the list doesn't exist - - const validKeyItems = validKeysList.querySelectorAll("li[data-key]"); // Select li elements within the list - // 读取阈值,如果输入无效或为空,则默认为0(不过滤) - const threshold = parseInt(thresholdInput.value, 10); - const filterThreshold = isNaN(threshold) || threshold < 0 ? 0 : threshold; - let hasVisibleItems = false; - - validKeyItems.forEach((item) => { - // 确保只处理包含 data-fail-count 的 li 元素 - if (item.dataset.failCount !== undefined) { - const failCount = parseInt(item.dataset.failCount, 10); - // 如果失败次数大于等于阈值,则显示,否则隐藏 - if (failCount >= filterThreshold) { - item.style.display = "flex"; // 使用 flex 因为 li 现在是 flex 容器 - hasVisibleItems = true; - } else { - item.style.display = "none"; // 隐藏 - // 如果隐藏了一个项,取消其选中状态 - const checkbox = item.querySelector(".key-checkbox"); - if (checkbox && checkbox.checked) { - checkbox.checked = false; - } - } - } - }); - - // 更新有效密钥的批量操作状态和全选复选框 - updateBatchActions("valid"); - - // 处理"暂无有效密钥"消息 - const noMatchMsgId = "no-valid-keys-msg"; - let noMatchMsg = validKeysList.querySelector(`#${noMatchMsgId}`); - const initialKeyCount = validKeysList.querySelectorAll("li[data-key]").length; // 获取初始密钥数量 - - if (!hasVisibleItems && initialKeyCount > 0) { - // 仅当初始有密钥但现在都不可见时显示 - if (!noMatchMsg) { - noMatchMsg = document.createElement("li"); - noMatchMsg.id = noMatchMsgId; - noMatchMsg.className = "text-center text-gray-500 py-4 col-span-full"; - noMatchMsg.textContent = "没有符合条件的有效密钥"; - validKeysList.appendChild(noMatchMsg); - } - noMatchMsg.style.display = ""; - } else if (noMatchMsg) { - noMatchMsg.style.display = "none"; - } + // This function is now handled by filterAndSearchValidKeys + // Kept for now to avoid breaking any potential legacy calls, but should be removed later. + filterAndSearchValidKeys(); } // --- Initialization Helper Functions --- diff --git a/app/templates/keys_status.html b/app/templates/keys_status.html index 1c59c81..b7ce5e1 100644 --- a/app/templates/keys_status.html +++ b/app/templates/keys_status.html @@ -1245,6 +1245,7 @@ endblock %} {% block head_extra_styles %} +