refactor: 代码结构优化与常量化

将日志系统从 app/logger/ 移至 app/log/ 目录
将路由配置从 routers.py 重命名为 routes.py
将硬编码配置值移至 constants.py 中的默认常量
统一代码格式和导入排序
优化函数参数对齐方式
This commit is contained in:
snaily
2025-03-20 21:59:18 +08:00
parent b14bb93d8f
commit b3a057b6ba
21 changed files with 442 additions and 282 deletions

View File

@@ -1,10 +1,11 @@
# app/services/chat/retry_handler.py
from typing import TypeVar, Callable
from functools import wraps
from app.logger.logger import get_retry_logger
from typing import Callable, TypeVar
T = TypeVar('T')
from app.log.logger import get_retry_logger
T = TypeVar("T")
logger = get_retry_logger()
@@ -25,17 +26,21 @@ class RetryHandler:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
logger.warning(f"API call failed with error: {str(e)}. Attempt {attempt + 1} of {self.max_retries}")
logger.warning(
f"API call failed with error: {str(e)}. Attempt {attempt + 1} of {self.max_retries}"
)
# 从函数参数中获取 key_manager
key_manager = kwargs.get('key_manager')
key_manager = kwargs.get("key_manager")
if key_manager:
old_key = kwargs.get(self.key_arg)
new_key = await key_manager.handle_api_failure(old_key)
kwargs[self.key_arg] = new_key
logger.info(f"Switched to new API key: {new_key}")
logger.error(f"All retry attempts failed, raising final exception: {str(last_exception)}")
logger.error(
f"All retry attempts failed, raising final exception: {str(last_exception)}"
)
raise last_exception
return wrapper

View File

@@ -2,10 +2,17 @@
import asyncio
import math
from typing import Any, List, AsyncGenerator, Callable
from app.logger.logger import get_openai_logger, get_gemini_logger
from typing import Any, AsyncGenerator, Callable, List
from app.config.config import settings
from app.core.constants import DEFAULT_STREAM_CHUNK_SIZE, DEFAULT_STREAM_LONG_TEXT_THRESHOLD, DEFAULT_STREAM_MAX_DELAY, DEFAULT_STREAM_MIN_DELAY, DEFAULT_STREAM_SHORT_TEXT_THRESHOLD
from app.core.constants import (
DEFAULT_STREAM_CHUNK_SIZE,
DEFAULT_STREAM_LONG_TEXT_THRESHOLD,
DEFAULT_STREAM_MAX_DELAY,
DEFAULT_STREAM_MIN_DELAY,
DEFAULT_STREAM_SHORT_TEXT_THRESHOLD,
)
from app.log.logger import get_gemini_logger, get_openai_logger
logger_openai = get_openai_logger()
logger_gemini = get_gemini_logger()
@@ -13,19 +20,21 @@ logger_gemini = get_gemini_logger()
class StreamOptimizer:
"""流式输出优化器
提供流式输出优化功能,包括智能延迟调整和长文本分块输出。
"""
def __init__(self,
logger=None,
min_delay: float = DEFAULT_STREAM_MIN_DELAY,
max_delay: float = DEFAULT_STREAM_MAX_DELAY,
short_text_threshold: int = DEFAULT_STREAM_SHORT_TEXT_THRESHOLD,
long_text_threshold: int = DEFAULT_STREAM_LONG_TEXT_THRESHOLD,
chunk_size: int = DEFAULT_STREAM_CHUNK_SIZE):
def __init__(
self,
logger=None,
min_delay: float = DEFAULT_STREAM_MIN_DELAY,
max_delay: float = DEFAULT_STREAM_MAX_DELAY,
short_text_threshold: int = DEFAULT_STREAM_SHORT_TEXT_THRESHOLD,
long_text_threshold: int = DEFAULT_STREAM_LONG_TEXT_THRESHOLD,
chunk_size: int = DEFAULT_STREAM_CHUNK_SIZE,
):
"""初始化流式输出优化器
参数:
logger: 日志记录器
min_delay: 最小延迟时间(秒)
@@ -40,13 +49,13 @@ class StreamOptimizer:
self.short_text_threshold = short_text_threshold
self.long_text_threshold = long_text_threshold
self.chunk_size = chunk_size
def calculate_delay(self, text_length: int) -> float:
"""根据文本长度计算延迟时间
参数:
text_length: 文本长度
返回:
延迟时间(秒)
"""
@@ -59,42 +68,48 @@ class StreamOptimizer:
else:
# 中等长度文本使用线性插值计算延迟
# 使用对数函数使延迟变化更平滑
ratio = math.log(text_length / self.short_text_threshold) / math.log(self.long_text_threshold / self.short_text_threshold)
ratio = math.log(text_length / self.short_text_threshold) / math.log(
self.long_text_threshold / self.short_text_threshold
)
return self.max_delay - ratio * (self.max_delay - self.min_delay)
def split_text_into_chunks(self, text: str) -> List[str]:
"""将文本分割成小块
参数:
text: 要分割的文本
返回:
文本块列表
"""
return [text[i:i+self.chunk_size] for i in range(0, len(text), self.chunk_size)]
async def optimize_stream_output(self,
text: str,
create_response_chunk: Callable[[str], Any],
format_chunk: Callable[[Any], str]) -> AsyncGenerator[str, None]:
return [
text[i : i + self.chunk_size] for i in range(0, len(text), self.chunk_size)
]
async def optimize_stream_output(
self,
text: str,
create_response_chunk: Callable[[str], Any],
format_chunk: Callable[[Any], str],
) -> AsyncGenerator[str, None]:
"""优化流式输出
参数:
text: 要输出的文本
create_response_chunk: 创建响应块的函数,接收文本,返回响应块
format_chunk: 格式化响应块的函数,接收响应块,返回格式化后的字符串
返回:
异步生成器,生成格式化后的响应块
"""
if not text:
return
# 计算智能延迟时间
delay = self.calculate_delay(len(text))
if self.logger:
self.logger.info(f"Text length: {len(text)}, delay: {delay:.4f}s")
# 根据文本长度决定输出方式
if len(text) >= self.long_text_threshold:
# 长文本:分块输出
@@ -120,7 +135,7 @@ openai_optimizer = StreamOptimizer(
max_delay=settings.STREAM_MAX_DELAY,
short_text_threshold=settings.STREAM_SHORT_TEXT_THRESHOLD,
long_text_threshold=settings.STREAM_LONG_TEXT_THRESHOLD,
chunk_size=settings.STREAM_CHUNK_SIZE
chunk_size=settings.STREAM_CHUNK_SIZE,
)
gemini_optimizer = StreamOptimizer(
@@ -129,5 +144,5 @@ gemini_optimizer = StreamOptimizer(
max_delay=settings.STREAM_MAX_DELAY,
short_text_threshold=settings.STREAM_SHORT_TEXT_THRESHOLD,
long_text_threshold=settings.STREAM_LONG_TEXT_THRESHOLD,
chunk_size=settings.STREAM_CHUNK_SIZE
chunk_size=settings.STREAM_CHUNK_SIZE,
)