diff --git a/.env.example b/.env.example index c59f58d..cded0e2 100644 --- a/.env.example +++ b/.env.example @@ -59,6 +59,10 @@ AUTO_DELETE_REQUEST_LOGS_ENABLED=false AUTO_DELETE_REQUEST_LOGS_DAYS=30 ########################################################################## +# 假流式配置 (Fake Streaming Configuration) +FAKE_STREAM_ENABLED=True # 是否启用假流式输出 +FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS=5 # 假流式发送空数据的间隔时间(秒) + # 安全设置 (JSON 字符串格式) # 注意:这里的示例值可能需要根据实际模型支持情况调整 SAFETY_SETTINGS='[{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"}, {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF"}, {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF"}, {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF"}, {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}]' diff --git a/README.md b/README.md index 09177f9..16c054b 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,7 @@ app/ 1. **确保已完成准备工作**: * 克隆仓库到本地。 * 安装 Python 3.9 或更高版本。 - * 在项目根目录下创建并配置好 `.env` 文件 (参考前面的“配置环境变量”部分)。 + * 在项目根目录下创建并配置好 `.env` 文件 (参考前面的"配置环境变量"部分)。 * 安装项目依赖: ```bash @@ -204,6 +204,9 @@ app/ | `STREAM_SHORT_TEXT_THRESHOLD`| 可选,短文本阈值 | `10` | | `STREAM_LONG_TEXT_THRESHOLD` | 可选,长文本阈值 | `50` | | `STREAM_CHUNK_SIZE` | 可选,流式输出块大小 | `5` | +| **伪流式 (Fake Stream) 相关** | | | +| `FAKE_STREAM_ENABLED` | 可选,是否启用伪流式传输,用于不支持流式的模型或场景 | `false` | +| `FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS` | 可选,伪流式传输时发送心跳空数据的间隔秒数 | `5` | ## ⚙️ API 端点 diff --git a/app/config/config.py b/app/config/config.py index c8b39b8..4e688dc 100644 --- a/app/config/config.py +++ b/app/config/config.py @@ -88,6 +88,10 @@ class Settings(BaseSettings): STREAM_LONG_TEXT_THRESHOLD: int = DEFAULT_STREAM_LONG_TEXT_THRESHOLD STREAM_CHUNK_SIZE: int = DEFAULT_STREAM_CHUNK_SIZE + # 假流式配置 (Fake Streaming Configuration) + FAKE_STREAM_ENABLED: bool = False # 是否启用假流式输出 + FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS: int = 5 # 假流式发送空数据的间隔时间(秒) + # 调度器配置 CHECK_INTERVAL_HOURS: int = 1 # 默认检查间隔为1小时 TIMEZONE: str = "Asia/Shanghai" # 默认时区 diff --git a/app/service/chat/openai_chat_service.py b/app/service/chat/openai_chat_service.py index 33c5f8b..a89102e 100644 --- a/app/service/chat/openai_chat_service.py +++ b/app/service/chat/openai_chat_service.py @@ -1,5 +1,6 @@ # app/services/chat_service.py +import asyncio import datetime import json import re @@ -208,7 +209,11 @@ class OpenAIChatService: is_success = True status_code = 200 return self.response_handler.handle_response( - response, model, stream=False, finish_reason="stop", usage_metadata=usage_metadata + response, + model, + stream=False, + finish_reason="stop", + usage_metadata=usage_metadata, ) except Exception as e: is_success = False @@ -242,76 +247,240 @@ class OpenAIChatService: request_time=request_datetime, ) + async def _fake_stream_logic_impl( + self, model: str, payload: Dict[str, Any], api_key: str + ) -> AsyncGenerator[str, None]: + """处理伪流式 (fake stream) 的核心逻辑""" + logger.info( + f"Fake streaming enabled for model: {model}. Calling non-streaming endpoint." + ) + keep_sending_empty_data = True + + async def send_empty_data_locally() -> AsyncGenerator[str, None]: + """定期发送空数据以保持连接""" + while keep_sending_empty_data: + await asyncio.sleep(settings.FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS) + if keep_sending_empty_data: + empty_chunk = { + "id": f"chatcmpl-fake-heartbeat-{model}-{time.time()}", + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model, + "choices": [{"index": 0, "delta": {}, "finish_reason": None}], + } + yield f"data: {json.dumps(empty_chunk)}\n\\n" + logger.debug("Sent empty data chunk for fake stream heartbeat.") + + empty_data_generator = send_empty_data_locally() + api_response_task = asyncio.create_task( + self.api_client.generate_content(payload, model, api_key) + ) + + try: + while not api_response_task.done(): + try: + next_empty_chunk = await asyncio.wait_for( + empty_data_generator.__anext__(), timeout=0.1 + ) + yield next_empty_chunk + except asyncio.TimeoutError: + pass # Check api_response_task again + except ( + StopAsyncIteration + ): # Should not happen if keep_sending_empty_data is managed + break + + response = await api_response_task # Get API response or exception + finally: + keep_sending_empty_data = False # Stop sending empty data + + # Helper to create a base chunk for various scenarios + def create_base_chunk(role_content=""): + return { + "id": f"chatcmpl-fake-response-{model}-{time.time()}", + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model, + "choices": [ + { + "index": 0, + "delta": {"role": "assistant", "content": role_content}, + "finish_reason": None, + } + ], + } + + if response and response.get("candidates"): + candidate = response["candidates"][0] + if candidate.get("content") and candidate["content"].get("parts"): + full_text = "".join( + part.get("text", "") + for part in candidate["content"]["parts"] + if part.get("text") + ) + base_chunk_for_text = create_base_chunk() + final_chunk = self._create_char_openai_chunk( + base_chunk_for_text, full_text + ) + final_chunk["choices"][0]["finish_reason"] = "stop" + yield f"data: {json.dumps(final_chunk)}\n\\n" + logger.info(f"Sent full response content for fake stream: {model}") + else: + logger.warning( + f"Unexpected response structure (no parts/text) in fake stream for model {model}: {response}" + ) + base_chunk_for_empty = create_base_chunk() + empty_final_chunk = self._create_char_openai_chunk( + base_chunk_for_empty, "" + ) + empty_final_chunk["choices"][0]["finish_reason"] = "stop" + yield f"data: {json.dumps(empty_final_chunk)}\n\\n" + else: + error_message = "Failed to get response from model" + if ( + response and isinstance(response, dict) and response.get("error") + ): # Check if response itself is an error structure + # Safely access nested 'message' + error_details = response.get("error") + if isinstance(error_details, dict): + error_message = error_details.get("message", error_message) + + logger.error( + f"No candidates or error in response for fake stream model {model}: {response}" + ) + base_chunk_for_error = create_base_chunk() + error_chunk = self._create_char_openai_chunk( + base_chunk_for_error, json.dumps({"error": error_message}) + ) + error_chunk["choices"][0]["finish_reason"] = "stop" + yield f"data: {json.dumps(error_chunk)}\n\\n" + + async def _real_stream_logic_impl( + self, model: str, payload: Dict[str, Any], api_key: str + ) -> AsyncGenerator[str, None]: + """处理真实流式 (real stream) 的核心逻辑""" + tool_call_flag = False + async for line in self.api_client.stream_generate_content( + payload, model, api_key + ): + if line.startswith("data:"): + chunk_str = line[6:] + if not chunk_str or chunk_str.isspace(): # handle empty data part + logger.debug( + f"Received empty data line for model {model}, skipping." + ) + continue + try: + chunk = json.loads(chunk_str) + except json.JSONDecodeError: + logger.error( + f"Failed to decode JSON from stream for model {model}: {chunk_str}" + ) + continue # Skip malformed chunk + + openai_chunk = self.response_handler.handle_response( + chunk, model, stream=True, finish_reason=None + ) + if openai_chunk: + text = self._extract_text_from_openai_chunk(openai_chunk) + if text and settings.STREAM_OPTIMIZER_ENABLED: + async for ( + optimized_chunk_data + ) in openai_optimizer.optimize_stream_output( + text, + lambda t: self._create_char_openai_chunk(openai_chunk, t), + lambda c: f"data: {json.dumps(c)}\n\\n", + ): + yield optimized_chunk_data + else: + # Check for tool_calls more robustly + if openai_chunk.get("choices") and openai_chunk["choices"][ + 0 + ].get("delta", {}).get("tool_calls"): + tool_call_flag = True + elif openai_chunk.get("choices") and openai_chunk["choices"][ + 0 + ].get("delta", {}).get( + "function_call" + ): # For older compatibility + tool_call_flag = True + + yield f"data: {json.dumps(openai_chunk)}\n\\n" + + if tool_call_flag: + yield f"data: {json.dumps(self.response_handler.handle_response({}, model, stream=True, finish_reason='tool_calls'))}\n\\n" + else: + yield f"data: {json.dumps(self.response_handler.handle_response({}, model, stream=True, finish_reason='stop'))}\n\\n" + async def _handle_stream_completion( self, model: str, payload: Dict[str, Any], api_key: str ) -> AsyncGenerator[str, None]: - """处理流式聊天完成,添加重试逻辑""" + """处理流式聊天完成,添加重试逻辑和假流式支持""" retries = 0 max_retries = settings.MAX_RETRIES is_success = False status_code = None - final_api_key = api_key + final_api_key = api_key # Initialize with the provided API key while retries < max_retries: start_time = time.perf_counter() request_datetime = datetime.datetime.now() - current_attempt_key = api_key - final_api_key = current_attempt_key + current_attempt_key = ( + final_api_key # Use the potentially updated key for this attempt + ) + try: - tool_call_flag = False - async for line in self.api_client.stream_generate_content( - payload, model, current_attempt_key - ): - # print(line) - if line.startswith("data:"): - chunk = json.loads(line[6:]) - openai_chunk = self.response_handler.handle_response( - chunk, model, stream=True, finish_reason=None - ) - if openai_chunk: - # 提取文本内容 - text = self._extract_text_from_openai_chunk(openai_chunk) - if text and settings.STREAM_OPTIMIZER_ENABLED: - # 使用流式输出优化器处理文本输出 - async for ( - optimized_chunk - ) in openai_optimizer.optimize_stream_output( - text, - lambda t: self._create_char_openai_chunk( - openai_chunk, t - ), - lambda c: f"data: {json.dumps(c)}\n\n", - ): - yield optimized_chunk - else: - # 如果没有文本内容(如工具调用等),整块输出 - if "tool_calls" in json.dumps(openai_chunk): - tool_call_flag = True - yield f"data: {json.dumps(openai_chunk)}\n\n" - if tool_call_flag: - yield f"data: {json.dumps(self.response_handler.handle_response({}, model, stream=True, finish_reason='tool_calls'))}\n\n" + stream_generator = None + if settings.FAKE_STREAM_ENABLED: + logger.info( + f"Using fake stream logic for model: {model}, Attempt: {retries + 1}" + ) + stream_generator = self._fake_stream_logic_impl( + model, payload, current_attempt_key + ) else: - yield f"data: {json.dumps(self.response_handler.handle_response({}, model, stream=True, finish_reason='stop'))}\n\n" - yield "data: [DONE]\n\n" - logger.info("Streaming completed successfully") + logger.info( + f"Using real stream logic for model: {model}, Attempt: {retries + 1}" + ) + stream_generator = self._real_stream_logic_impl( + model, payload, current_attempt_key + ) + + async for chunk_data in stream_generator: + yield chunk_data + + # If the generator completes, it means all its data chunks (including stop/tool_calls) were yielded. + # Now, we send the [DONE] marker for the stream. + yield "data: [DONE]\n\\n" + logger.info( + f"Streaming completed successfully for model: {model}, FakeStream: {settings.FAKE_STREAM_ENABLED}, Attempt: {retries + 1}" + ) is_success = True status_code = 200 - break # 成功后退出循环 + break # Successful attempt, exit retry loop + except Exception as e: retries += 1 - is_success = False + is_success = False # Ensure is_success is false for this attempt error_log_msg = str(e) logger.warning( - f"Streaming API call failed with error: {error_log_msg}. Attempt {retries} of {max_retries}" + f"Streaming API call failed with error: {error_log_msg}. Attempt {retries} of {max_retries} with key {current_attempt_key}" ) - # Parse error code for logging - match = re.search(r"status code (\d+)", error_log_msg) + + match = re.search(r"status code (\\d+)", error_log_msg) if match: status_code = int(match.group(1)) else: - status_code = 500 + # Distinguish between client-side (e.g., asyncio.TimeoutError) and potential API errors + if isinstance( + e, asyncio.TimeoutError + ): # Example, can add more specific client errors + status_code = 408 # Request Timeout + else: + status_code = ( + 500 # Internal Server Error as default for other exceptions + ) - # Log error to error log table await add_error_log( gemini_key=current_attempt_key, model_name=model, @@ -321,42 +490,54 @@ class OpenAIChatService: request_msg=payload, ) - # Attempt to switch API Key - # Ensure key_manager is available (might need adjustment if not always passed) if self.key_manager: - api_key = await self.key_manager.handle_api_failure( + new_api_key = await self.key_manager.handle_api_failure( current_attempt_key, retries ) - if api_key: - logger.info(f"Switched to new API key: {api_key}") - else: - logger.error( - f"No valid API key available after {retries} retries." + if new_api_key and new_api_key != current_attempt_key: + final_api_key = new_api_key # Update for the NEXT attempt + logger.info( + f"Switched to new API key for next attempt: {final_api_key}" ) - break + elif not new_api_key: + logger.error( + f"No valid API key available after {retries} retries, ceasing attempts for this request." + ) + break # No new key, stop retrying + # If new_api_key is the same as current_attempt_key, continue retrying with it if retries < max_retries else: - logger.error("KeyManager not available for retry logic.") - break + logger.error( + "KeyManager not available, cannot switch API key. Ceasing attempts for this request." + ) + break # No KeyManager, stop retrying if retries >= max_retries: - logger.error(f"Max retries ({max_retries}) reached for streaming.") - break + logger.error( + f"Max retries ({max_retries}) reached for streaming model {model}." + ) + # The loop will terminate, and the final error handling outside the loop will take over. finally: - # Log the final outcome of the streaming request end_time = time.perf_counter() latency_ms = int((end_time - start_time) * 1000) + # Log with the key used for THIS specific attempt await add_request_log( model_name=model, - api_key=final_api_key, - is_success=is_success, + api_key=current_attempt_key, + is_success=is_success, # This reflects the success of the current attempt status_code=status_code, latency_ms=latency_ms, request_time=request_datetime, ) - # If the loop finished due to failure, yield error and DONE - if not is_success and retries >= max_retries: - yield f"data: {json.dumps({'error': 'Streaming failed after retries'})}\n\n" - yield "data: [DONE]\n\n" + + # After the loop, if not successful, yield a final error message and [DONE] + if ( + not is_success + ): # This 'is_success' is the overall success status after all retries + logger.error( + f"Streaming failed permanently for model {model} after {retries} attempts." + ) + yield f"data: {json.dumps({'error': f'Streaming failed after {retries} retries.'})}\n\\n" + yield "data: [DONE]\n\\n" async def create_image_chat_completion( self, request: ChatRequest, api_key: str @@ -384,7 +565,7 @@ class OpenAIChatService: start_time = time.perf_counter() request_datetime = datetime.datetime.now() is_success = False - status_code = None + status_code = None try: if image_data: @@ -418,16 +599,14 @@ class OpenAIChatService: is_success = False error_log_msg = f"Stream image completion failed for model {model}: {e}" logger.error(error_log_msg) - status_code = 500 + status_code = 500 await add_error_log( gemini_key=api_key, model_name=model, error_type="openai-image-stream", error_log=error_log_msg, error_code=status_code, - request_msg={ - "image_data_truncated": image_data[:1000] - }, + request_msg={"image_data_truncated": image_data[:1000]}, ) yield f"data: {json.dumps({'error': error_log_msg})}\n\n" yield "data: [DONE]\n\n" @@ -447,13 +626,13 @@ class OpenAIChatService: ) async def _handle_normal_image_completion( - self, model: str, image_data: str, api_key: str + self, model: str, image_data: str, api_key: str ) -> Dict[str, Any]: logger.info(f"Starting normal image completion for model: {model}") start_time = time.perf_counter() - request_datetime = datetime.datetime.now() + request_datetime = datetime.datetime.now() is_success = False - status_code = None + status_code = None result = None try: @@ -477,9 +656,7 @@ class OpenAIChatService: error_type="openai-image-non-stream", error_log=error_log_msg, error_code=status_code, - request_msg={ - "image_data_truncated": image_data[:1000] - }, + request_msg={"image_data_truncated": image_data[:1000]}, ) # Re-raise the exception so the caller knows about the failure raise e diff --git a/app/static/js/config_editor.js b/app/static/js/config_editor.js index e27ea8b..ac1471d 100644 --- a/app/static/js/config_editor.js +++ b/app/static/js/config_editor.js @@ -630,6 +630,15 @@ async function initConfig() { } // --- 结束:处理自动删除请求日志配置的默认值 --- + // --- 新增:处理假流式配置的默认值 --- + if (typeof config.FAKE_STREAM_ENABLED === "undefined") { + config.FAKE_STREAM_ENABLED = false; + } + if (typeof config.FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS === "undefined") { + config.FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS = 5; + } + // --- 结束:处理假流式配置的默认值 --- + populateForm(config); // After populateForm, initialize masking for all populated sensitive fields if (configForm) { @@ -664,6 +673,10 @@ async function initConfig() { AUTO_DELETE_ERROR_LOGS_DAYS: 7, // 新增默认值 AUTO_DELETE_REQUEST_LOGS_ENABLED: false, // 新增默认值 AUTO_DELETE_REQUEST_LOGS_DAYS: 30, // 新增默认值 + // --- 新增:处理假流式配置的默认值 --- + FAKE_STREAM_ENABLED: false, + FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS: 5, + // --- 结束:处理假流式配置的默认值 --- }; populateForm(defaultConfig); @@ -866,6 +879,26 @@ function populateForm(config) { }); } // --- 结束:处理自动删除请求日志的字段 --- + + // --- 新增:处理假流式配置的字段 --- + const fakeStreamEnabledCheckbox = document.getElementById( + "FAKE_STREAM_ENABLED" + ); + const fakeStreamIntervalInput = document.getElementById( + "FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS" + ); + + if (fakeStreamEnabledCheckbox && fakeStreamIntervalInput) { + fakeStreamEnabledCheckbox.checked = !!config.FAKE_STREAM_ENABLED; + fakeStreamIntervalInput.value = + config.FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS || 5; + // 根据复选框状态设置输入框的禁用状态 (如果需要) + // fakeStreamIntervalInput.disabled = !fakeStreamEnabledCheckbox.checked; + // fakeStreamEnabledCheckbox.addEventListener("change", function () { + // fakeStreamIntervalInput.disabled = !this.checked; + // }); + } + // --- 结束:处理假流式配置的字段 --- } /** @@ -1461,6 +1494,24 @@ function collectFormData() { } // --- 结束:收集自动删除请求日志的配置 --- + // --- 新增:收集假流式配置 --- + const fakeStreamEnabledCheckbox = document.getElementById( + "FAKE_STREAM_ENABLED" + ); + if (fakeStreamEnabledCheckbox) { + formData["FAKE_STREAM_ENABLED"] = fakeStreamEnabledCheckbox.checked; + } + const fakeStreamIntervalInput = document.getElementById( + "FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS" + ); + if (fakeStreamIntervalInput) { + formData["FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS"] = parseInt( + fakeStreamIntervalInput.value, + 10 + ); + } + // --- 结束:收集假流式配置 --- + return formData; } diff --git a/app/templates/config_editor.html b/app/templates/config_editor.html index e4761a8..671049d 100644 --- a/app/templates/config_editor.html +++ b/app/templates/config_editor.html @@ -1101,6 +1101,60 @@ endblock %} {% block head_extra_styles %} /> 流式输出的分块大小 + + +