feat: 实现伪流式传输功能

本次提交引入了伪流式传输(Fake Streaming)功能,旨在为不支持原生流式响应的语言模型或特定场景提供类似流式的用户体验。

主要变更包括:

- **配置更新**:
    - 在 `.env.example` 和 `app/config/config.py` 中添加了新的配置项 `FAKE_STREAM_ENABLED` 和 `FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS`,用于控制伪流式功能的启用和心跳包发送间隔。
    - 更新了 `README.md` 以包含新的伪流式配置说明。

- **核心服务逻辑**:
    - 在 `app/service/chat/openai_chat_service.py` 中:
        - 新增 `_fake_stream_logic_impl` 方法,用于处理伪流式调用的核心逻辑。当启用伪流式时,该方法会调用非流式接口,并在等待期间定期发送空数据块以维持连接。
        - 修改 `_handle_stream_completion` 方法,使其能够根据 `FAKE_STREAM_ENABLED` 配置在真实流式和伪流式逻辑之间切换。
        - 改进了流式处理中的重试逻辑、API密钥切换机制以及错误日志记录,使其更加健壮。特别是在伪流式场景下,确保了即使在非流式调用中也能正确处理和记录错误。

- **前端配置界面**:
    - 在 `app/static/js/config_editor.js` 中添加了处理和填充伪流式配置项的逻辑。
    - 在 `app/templates/config_editor.html` 中为伪流式配置添加了相应的表单控件,允许用户在配置编辑器中启用/禁用伪流式并设置空数据发送间隔。

该功能通过在后端模拟流式输出,即使底层模型不支持流式传输,也能向客户端提供持续的数据流,从而改善了用户体验,特别是在处理可能耗时较长的请求时。
This commit is contained in:
snaily
2025-05-08 23:37:35 +08:00
parent b6a54190ed
commit a7d548a849
6 changed files with 372 additions and 79 deletions

View File

@@ -59,6 +59,10 @@ AUTO_DELETE_REQUEST_LOGS_ENABLED=false
AUTO_DELETE_REQUEST_LOGS_DAYS=30
##########################################################################
# 假流式配置 (Fake Streaming Configuration)
FAKE_STREAM_ENABLED=True # 是否启用假流式输出
FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS=5 # 假流式发送空数据的间隔时间(秒)
# 安全设置 (JSON 字符串格式)
# 注意:这里的示例值可能需要根据实际模型支持情况调整
SAFETY_SETTINGS='[{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"}, {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF"}, {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF"}, {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF"}, {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}]'

View File

@@ -128,7 +128,7 @@ app/
1. **确保已完成准备工作**:
* 克隆仓库到本地。
* 安装 Python 3.9 或更高版本。
* 在项目根目录下创建并配置好 `.env` 文件 (参考前面的配置环境变量部分)。
* 在项目根目录下创建并配置好 `.env` 文件 (参考前面的"配置环境变量"部分)。
* 安装项目依赖:
```bash
@@ -204,6 +204,9 @@ app/
| `STREAM_SHORT_TEXT_THRESHOLD`| 可选,短文本阈值 | `10` |
| `STREAM_LONG_TEXT_THRESHOLD` | 可选,长文本阈值 | `50` |
| `STREAM_CHUNK_SIZE` | 可选,流式输出块大小 | `5` |
| **伪流式 (Fake Stream) 相关** | | |
| `FAKE_STREAM_ENABLED` | 可选,是否启用伪流式传输,用于不支持流式的模型或场景 | `false` |
| `FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS` | 可选,伪流式传输时发送心跳空数据的间隔秒数 | `5` |
## ⚙️ API 端点

View File

@@ -88,6 +88,10 @@ class Settings(BaseSettings):
STREAM_LONG_TEXT_THRESHOLD: int = DEFAULT_STREAM_LONG_TEXT_THRESHOLD
STREAM_CHUNK_SIZE: int = DEFAULT_STREAM_CHUNK_SIZE
# 假流式配置 (Fake Streaming Configuration)
FAKE_STREAM_ENABLED: bool = False # 是否启用假流式输出
FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS: int = 5 # 假流式发送空数据的间隔时间(秒)
# 调度器配置
CHECK_INTERVAL_HOURS: int = 1 # 默认检查间隔为1小时
TIMEZONE: str = "Asia/Shanghai" # 默认时区

View File

@@ -1,5 +1,6 @@
# app/services/chat_service.py
import asyncio
import datetime
import json
import re
@@ -208,7 +209,11 @@ class OpenAIChatService:
is_success = True
status_code = 200
return self.response_handler.handle_response(
response, model, stream=False, finish_reason="stop", usage_metadata=usage_metadata
response,
model,
stream=False,
finish_reason="stop",
usage_metadata=usage_metadata,
)
except Exception as e:
is_success = False
@@ -242,76 +247,240 @@ class OpenAIChatService:
request_time=request_datetime,
)
async def _fake_stream_logic_impl(
self, model: str, payload: Dict[str, Any], api_key: str
) -> AsyncGenerator[str, None]:
"""处理伪流式 (fake stream) 的核心逻辑"""
logger.info(
f"Fake streaming enabled for model: {model}. Calling non-streaming endpoint."
)
keep_sending_empty_data = True
async def send_empty_data_locally() -> AsyncGenerator[str, None]:
"""定期发送空数据以保持连接"""
while keep_sending_empty_data:
await asyncio.sleep(settings.FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS)
if keep_sending_empty_data:
empty_chunk = {
"id": f"chatcmpl-fake-heartbeat-{model}-{time.time()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": None}],
}
yield f"data: {json.dumps(empty_chunk)}\n\\n"
logger.debug("Sent empty data chunk for fake stream heartbeat.")
empty_data_generator = send_empty_data_locally()
api_response_task = asyncio.create_task(
self.api_client.generate_content(payload, model, api_key)
)
try:
while not api_response_task.done():
try:
next_empty_chunk = await asyncio.wait_for(
empty_data_generator.__anext__(), timeout=0.1
)
yield next_empty_chunk
except asyncio.TimeoutError:
pass # Check api_response_task again
except (
StopAsyncIteration
): # Should not happen if keep_sending_empty_data is managed
break
response = await api_response_task # Get API response or exception
finally:
keep_sending_empty_data = False # Stop sending empty data
# Helper to create a base chunk for various scenarios
def create_base_chunk(role_content=""):
return {
"id": f"chatcmpl-fake-response-{model}-{time.time()}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": role_content},
"finish_reason": None,
}
],
}
if response and response.get("candidates"):
candidate = response["candidates"][0]
if candidate.get("content") and candidate["content"].get("parts"):
full_text = "".join(
part.get("text", "")
for part in candidate["content"]["parts"]
if part.get("text")
)
base_chunk_for_text = create_base_chunk()
final_chunk = self._create_char_openai_chunk(
base_chunk_for_text, full_text
)
final_chunk["choices"][0]["finish_reason"] = "stop"
yield f"data: {json.dumps(final_chunk)}\n\\n"
logger.info(f"Sent full response content for fake stream: {model}")
else:
logger.warning(
f"Unexpected response structure (no parts/text) in fake stream for model {model}: {response}"
)
base_chunk_for_empty = create_base_chunk()
empty_final_chunk = self._create_char_openai_chunk(
base_chunk_for_empty, ""
)
empty_final_chunk["choices"][0]["finish_reason"] = "stop"
yield f"data: {json.dumps(empty_final_chunk)}\n\\n"
else:
error_message = "Failed to get response from model"
if (
response and isinstance(response, dict) and response.get("error")
): # Check if response itself is an error structure
# Safely access nested 'message'
error_details = response.get("error")
if isinstance(error_details, dict):
error_message = error_details.get("message", error_message)
logger.error(
f"No candidates or error in response for fake stream model {model}: {response}"
)
base_chunk_for_error = create_base_chunk()
error_chunk = self._create_char_openai_chunk(
base_chunk_for_error, json.dumps({"error": error_message})
)
error_chunk["choices"][0]["finish_reason"] = "stop"
yield f"data: {json.dumps(error_chunk)}\n\\n"
async def _real_stream_logic_impl(
self, model: str, payload: Dict[str, Any], api_key: str
) -> AsyncGenerator[str, None]:
"""处理真实流式 (real stream) 的核心逻辑"""
tool_call_flag = False
async for line in self.api_client.stream_generate_content(
payload, model, api_key
):
if line.startswith("data:"):
chunk_str = line[6:]
if not chunk_str or chunk_str.isspace(): # handle empty data part
logger.debug(
f"Received empty data line for model {model}, skipping."
)
continue
try:
chunk = json.loads(chunk_str)
except json.JSONDecodeError:
logger.error(
f"Failed to decode JSON from stream for model {model}: {chunk_str}"
)
continue # Skip malformed chunk
openai_chunk = self.response_handler.handle_response(
chunk, model, stream=True, finish_reason=None
)
if openai_chunk:
text = self._extract_text_from_openai_chunk(openai_chunk)
if text and settings.STREAM_OPTIMIZER_ENABLED:
async for (
optimized_chunk_data
) in openai_optimizer.optimize_stream_output(
text,
lambda t: self._create_char_openai_chunk(openai_chunk, t),
lambda c: f"data: {json.dumps(c)}\n\\n",
):
yield optimized_chunk_data
else:
# Check for tool_calls more robustly
if openai_chunk.get("choices") and openai_chunk["choices"][
0
].get("delta", {}).get("tool_calls"):
tool_call_flag = True
elif openai_chunk.get("choices") and openai_chunk["choices"][
0
].get("delta", {}).get(
"function_call"
): # For older compatibility
tool_call_flag = True
yield f"data: {json.dumps(openai_chunk)}\n\\n"
if tool_call_flag:
yield f"data: {json.dumps(self.response_handler.handle_response({}, model, stream=True, finish_reason='tool_calls'))}\n\\n"
else:
yield f"data: {json.dumps(self.response_handler.handle_response({}, model, stream=True, finish_reason='stop'))}\n\\n"
async def _handle_stream_completion(
self, model: str, payload: Dict[str, Any], api_key: str
) -> AsyncGenerator[str, None]:
"""处理流式聊天完成,添加重试逻辑"""
"""处理流式聊天完成,添加重试逻辑和假流式支持"""
retries = 0
max_retries = settings.MAX_RETRIES
is_success = False
status_code = None
final_api_key = api_key
final_api_key = api_key # Initialize with the provided API key
while retries < max_retries:
start_time = time.perf_counter()
request_datetime = datetime.datetime.now()
current_attempt_key = api_key
final_api_key = current_attempt_key
current_attempt_key = (
final_api_key # Use the potentially updated key for this attempt
)
try:
tool_call_flag = False
async for line in self.api_client.stream_generate_content(
payload, model, current_attempt_key
):
# print(line)
if line.startswith("data:"):
chunk = json.loads(line[6:])
openai_chunk = self.response_handler.handle_response(
chunk, model, stream=True, finish_reason=None
)
if openai_chunk:
# 提取文本内容
text = self._extract_text_from_openai_chunk(openai_chunk)
if text and settings.STREAM_OPTIMIZER_ENABLED:
# 使用流式输出优化器处理文本输出
async for (
optimized_chunk
) in openai_optimizer.optimize_stream_output(
text,
lambda t: self._create_char_openai_chunk(
openai_chunk, t
),
lambda c: f"data: {json.dumps(c)}\n\n",
):
yield optimized_chunk
else:
# 如果没有文本内容(如工具调用等),整块输出
if "tool_calls" in json.dumps(openai_chunk):
tool_call_flag = True
yield f"data: {json.dumps(openai_chunk)}\n\n"
if tool_call_flag:
yield f"data: {json.dumps(self.response_handler.handle_response({}, model, stream=True, finish_reason='tool_calls'))}\n\n"
stream_generator = None
if settings.FAKE_STREAM_ENABLED:
logger.info(
f"Using fake stream logic for model: {model}, Attempt: {retries + 1}"
)
stream_generator = self._fake_stream_logic_impl(
model, payload, current_attempt_key
)
else:
yield f"data: {json.dumps(self.response_handler.handle_response({}, model, stream=True, finish_reason='stop'))}\n\n"
yield "data: [DONE]\n\n"
logger.info("Streaming completed successfully")
logger.info(
f"Using real stream logic for model: {model}, Attempt: {retries + 1}"
)
stream_generator = self._real_stream_logic_impl(
model, payload, current_attempt_key
)
async for chunk_data in stream_generator:
yield chunk_data
# If the generator completes, it means all its data chunks (including stop/tool_calls) were yielded.
# Now, we send the [DONE] marker for the stream.
yield "data: [DONE]\n\\n"
logger.info(
f"Streaming completed successfully for model: {model}, FakeStream: {settings.FAKE_STREAM_ENABLED}, Attempt: {retries + 1}"
)
is_success = True
status_code = 200
break # 成功后退出循环
break # Successful attempt, exit retry loop
except Exception as e:
retries += 1
is_success = False
is_success = False # Ensure is_success is false for this attempt
error_log_msg = str(e)
logger.warning(
f"Streaming API call failed with error: {error_log_msg}. Attempt {retries} of {max_retries}"
f"Streaming API call failed with error: {error_log_msg}. Attempt {retries} of {max_retries} with key {current_attempt_key}"
)
# Parse error code for logging
match = re.search(r"status code (\d+)", error_log_msg)
match = re.search(r"status code (\\d+)", error_log_msg)
if match:
status_code = int(match.group(1))
else:
status_code = 500
# Distinguish between client-side (e.g., asyncio.TimeoutError) and potential API errors
if isinstance(
e, asyncio.TimeoutError
): # Example, can add more specific client errors
status_code = 408 # Request Timeout
else:
status_code = (
500 # Internal Server Error as default for other exceptions
)
# Log error to error log table
await add_error_log(
gemini_key=current_attempt_key,
model_name=model,
@@ -321,42 +490,54 @@ class OpenAIChatService:
request_msg=payload,
)
# Attempt to switch API Key
# Ensure key_manager is available (might need adjustment if not always passed)
if self.key_manager:
api_key = await self.key_manager.handle_api_failure(
new_api_key = await self.key_manager.handle_api_failure(
current_attempt_key, retries
)
if api_key:
logger.info(f"Switched to new API key: {api_key}")
else:
logger.error(
f"No valid API key available after {retries} retries."
if new_api_key and new_api_key != current_attempt_key:
final_api_key = new_api_key # Update for the NEXT attempt
logger.info(
f"Switched to new API key for next attempt: {final_api_key}"
)
break
elif not new_api_key:
logger.error(
f"No valid API key available after {retries} retries, ceasing attempts for this request."
)
break # No new key, stop retrying
# If new_api_key is the same as current_attempt_key, continue retrying with it if retries < max_retries
else:
logger.error("KeyManager not available for retry logic.")
break
logger.error(
"KeyManager not available, cannot switch API key. Ceasing attempts for this request."
)
break # No KeyManager, stop retrying
if retries >= max_retries:
logger.error(f"Max retries ({max_retries}) reached for streaming.")
break
logger.error(
f"Max retries ({max_retries}) reached for streaming model {model}."
)
# The loop will terminate, and the final error handling outside the loop will take over.
finally:
# Log the final outcome of the streaming request
end_time = time.perf_counter()
latency_ms = int((end_time - start_time) * 1000)
# Log with the key used for THIS specific attempt
await add_request_log(
model_name=model,
api_key=final_api_key,
is_success=is_success,
api_key=current_attempt_key,
is_success=is_success, # This reflects the success of the current attempt
status_code=status_code,
latency_ms=latency_ms,
request_time=request_datetime,
)
# If the loop finished due to failure, yield error and DONE
if not is_success and retries >= max_retries:
yield f"data: {json.dumps({'error': 'Streaming failed after retries'})}\n\n"
yield "data: [DONE]\n\n"
# After the loop, if not successful, yield a final error message and [DONE]
if (
not is_success
): # This 'is_success' is the overall success status after all retries
logger.error(
f"Streaming failed permanently for model {model} after {retries} attempts."
)
yield f"data: {json.dumps({'error': f'Streaming failed after {retries} retries.'})}\n\\n"
yield "data: [DONE]\n\\n"
async def create_image_chat_completion(
self, request: ChatRequest, api_key: str
@@ -384,7 +565,7 @@ class OpenAIChatService:
start_time = time.perf_counter()
request_datetime = datetime.datetime.now()
is_success = False
status_code = None
status_code = None
try:
if image_data:
@@ -418,16 +599,14 @@ class OpenAIChatService:
is_success = False
error_log_msg = f"Stream image completion failed for model {model}: {e}"
logger.error(error_log_msg)
status_code = 500
status_code = 500
await add_error_log(
gemini_key=api_key,
model_name=model,
error_type="openai-image-stream",
error_log=error_log_msg,
error_code=status_code,
request_msg={
"image_data_truncated": image_data[:1000]
},
request_msg={"image_data_truncated": image_data[:1000]},
)
yield f"data: {json.dumps({'error': error_log_msg})}\n\n"
yield "data: [DONE]\n\n"
@@ -447,13 +626,13 @@ class OpenAIChatService:
)
async def _handle_normal_image_completion(
self, model: str, image_data: str, api_key: str
self, model: str, image_data: str, api_key: str
) -> Dict[str, Any]:
logger.info(f"Starting normal image completion for model: {model}")
start_time = time.perf_counter()
request_datetime = datetime.datetime.now()
request_datetime = datetime.datetime.now()
is_success = False
status_code = None
status_code = None
result = None
try:
@@ -477,9 +656,7 @@ class OpenAIChatService:
error_type="openai-image-non-stream",
error_log=error_log_msg,
error_code=status_code,
request_msg={
"image_data_truncated": image_data[:1000]
},
request_msg={"image_data_truncated": image_data[:1000]},
)
# Re-raise the exception so the caller knows about the failure
raise e

View File

@@ -630,6 +630,15 @@ async function initConfig() {
}
// --- 结束:处理自动删除请求日志配置的默认值 ---
// --- 新增:处理假流式配置的默认值 ---
if (typeof config.FAKE_STREAM_ENABLED === "undefined") {
config.FAKE_STREAM_ENABLED = false;
}
if (typeof config.FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS === "undefined") {
config.FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS = 5;
}
// --- 结束:处理假流式配置的默认值 ---
populateForm(config);
// After populateForm, initialize masking for all populated sensitive fields
if (configForm) {
@@ -664,6 +673,10 @@ async function initConfig() {
AUTO_DELETE_ERROR_LOGS_DAYS: 7, // 新增默认值
AUTO_DELETE_REQUEST_LOGS_ENABLED: false, // 新增默认值
AUTO_DELETE_REQUEST_LOGS_DAYS: 30, // 新增默认值
// --- 新增:处理假流式配置的默认值 ---
FAKE_STREAM_ENABLED: false,
FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS: 5,
// --- 结束:处理假流式配置的默认值 ---
};
populateForm(defaultConfig);
@@ -866,6 +879,26 @@ function populateForm(config) {
});
}
// --- 结束:处理自动删除请求日志的字段 ---
// --- 新增:处理假流式配置的字段 ---
const fakeStreamEnabledCheckbox = document.getElementById(
"FAKE_STREAM_ENABLED"
);
const fakeStreamIntervalInput = document.getElementById(
"FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS"
);
if (fakeStreamEnabledCheckbox && fakeStreamIntervalInput) {
fakeStreamEnabledCheckbox.checked = !!config.FAKE_STREAM_ENABLED;
fakeStreamIntervalInput.value =
config.FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS || 5;
// 根据复选框状态设置输入框的禁用状态 (如果需要)
// fakeStreamIntervalInput.disabled = !fakeStreamEnabledCheckbox.checked;
// fakeStreamEnabledCheckbox.addEventListener("change", function () {
// fakeStreamIntervalInput.disabled = !this.checked;
// });
}
// --- 结束:处理假流式配置的字段 ---
}
/**
@@ -1461,6 +1494,24 @@ function collectFormData() {
}
// --- 结束:收集自动删除请求日志的配置 ---
// --- 新增:收集假流式配置 ---
const fakeStreamEnabledCheckbox = document.getElementById(
"FAKE_STREAM_ENABLED"
);
if (fakeStreamEnabledCheckbox) {
formData["FAKE_STREAM_ENABLED"] = fakeStreamEnabledCheckbox.checked;
}
const fakeStreamIntervalInput = document.getElementById(
"FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS"
);
if (fakeStreamIntervalInput) {
formData["FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS"] = parseInt(
fakeStreamIntervalInput.value,
10
);
}
// --- 结束:收集假流式配置 ---
return formData;
}

View File

@@ -1101,6 +1101,60 @@ endblock %} {% block head_extra_styles %}
/>
<small class="text-gray-500 mt-1 block">流式输出的分块大小</small>
</div>
<!-- Fake Streaming Configuration -->
<h3
class="text-lg font-semibold mb-4 pt-4 border-t border-violet-300 border-opacity-20 text-gray-200"
>
<i class="fas fa-ghost text-violet-400"></i> 假流式配置 (Fake
Streaming)
</h3>
<!-- 启用假流式输出 -->
<div class="mb-6 flex items-center justify-between">
<label for="FAKE_STREAM_ENABLED" class="font-semibold text-gray-700"
>启用假流式输出</label
>
<div
class="relative inline-block w-10 mr-2 align-middle select-none transition duration-200 ease-in"
>
<input
type="checkbox"
name="FAKE_STREAM_ENABLED"
id="FAKE_STREAM_ENABLED"
class="toggle-checkbox absolute block w-6 h-6 rounded-full bg-white border-4 appearance-none cursor-pointer"
/>
<label
for="FAKE_STREAM_ENABLED"
class="toggle-label block overflow-hidden h-6 rounded-full bg-gray-300 cursor-pointer"
></label>
</div>
</div>
<small class="text-gray-500 mt-1 block mb-4"
>当启用时,将调用非流式接口,并在等待响应期间发送空数据以维持连接。</small
>
<!-- 假流式发送空数据的间隔时间 -->
<div class="mb-6">
<label
for="FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS"
class="block font-semibold mb-2 text-gray-700"
>假流式空数据发送间隔(秒)</label
>
<input
type="number"
id="FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS"
name="FAKE_STREAM_EMPTY_DATA_INTERVAL_SECONDS"
min="1"
max="60"
step="1"
class="w-full px-4 py-3 rounded-lg border border-gray-300 focus:border-primary-500 focus:ring focus:ring-primary-200 focus:ring-opacity-50 form-input-themed"
/>
<small class="text-gray-500 mt-1 block"
>在启用假流式输出时,向客户端发送空数据以维持连接状态的时间间隔(建议
3-10 秒)。</small
>
</div>
</div>
<!-- 定时任务配置 -->