From 4c2bb99b59fd7e0fe9af19909e1c104135578a43 Mon Sep 17 00:00:00 2001 From: InfinityPacer <160988576+InfinityPacer@users.noreply.github.com> Date: Fri, 18 Oct 2024 00:39:35 +0800 Subject: [PATCH] refactor(lifecycle): add async support for SSE --- app/api/endpoints/system.py | 88 +++++++++++++++++++++++-------------- requirements.in | 4 +- requirements.txt | 32 ++++++++------ 3 files changed, 74 insertions(+), 50 deletions(-) diff --git a/app/api/endpoints/system.py b/app/api/endpoints/system.py index cf91c33a..3251b2f5 100644 --- a/app/api/endpoints/system.py +++ b/app/api/endpoints/system.py @@ -1,14 +1,15 @@ +import asyncio import io import json import tempfile -import time +from collections import deque from datetime import datetime from pathlib import Path from typing import Optional, Union -import tailer +import aiofiles from PIL import Image -from fastapi import APIRouter, Depends, HTTPException, Header, Response +from fastapi import APIRouter, Depends, HTTPException, Header, Request, Response from fastapi.responses import StreamingResponse from app import schemas @@ -224,19 +225,22 @@ def set_env_setting(env: dict, @router.get("/progress/{process_type}", summary="实时进度") -def get_progress(process_type: str, _: schemas.TokenPayload = Depends(verify_resource_token)): +async def get_progress(request: Request, process_type: str, _: schemas.TokenPayload = Depends(verify_resource_token)): """ 实时获取处理进度,返回格式为SSE """ progress = ProgressHelper() - def event_generator(): - while True: - if global_vars.is_system_stopped: - break - detail = progress.get(process_type) - yield 'data: %s\n\n' % json.dumps(detail) - time.sleep(0.2) + async def event_generator(): + try: + while not global_vars.is_system_stopped: + if await request.is_disconnected(): + break + detail = progress.get(process_type) + yield f"data: {json.dumps(detail)}\n\n" + await asyncio.sleep(0.2) + except asyncio.CancelledError: + return return StreamingResponse(event_generator(), media_type="text/event-stream") @@ -273,26 +277,29 @@ def set_setting(key: str, value: Union[list, dict, bool, int, str] = None, @router.get("/message", summary="实时消息") -def get_message(role: str = "system", _: schemas.TokenPayload = Depends(verify_resource_token)): +async def get_message(request: Request, role: str = "system", _: schemas.TokenPayload = Depends(verify_resource_token)): """ 实时获取系统消息,返回格式为SSE """ message = MessageHelper() - def event_generator(): - while True: - if global_vars.is_system_stopped: - break - detail = message.get(role) - yield 'data: %s\n\n' % (detail or '') - time.sleep(3) + async def event_generator(): + try: + while not global_vars.is_system_stopped: + if await request.is_disconnected(): + break + detail = message.get(role) + yield f"data: {detail or ''}\n\n" + await asyncio.sleep(3) + except asyncio.CancelledError: + return return StreamingResponse(event_generator(), media_type="text/event-stream") @router.get("/logging", summary="实时日志") -def get_logging(length: int = 50, logfile: str = "moviepilot.log", - _: schemas.TokenPayload = Depends(verify_resource_token)): +async def get_logging(request: Request, length: int = 50, logfile: str = "moviepilot.log", + _: schemas.TokenPayload = Depends(verify_resource_token)): """ 实时获取系统日志 length = -1 时, 返回text/plain @@ -306,27 +313,40 @@ def get_logging(length: int = 50, logfile: str = "moviepilot.log", if not log_path.exists() or not log_path.is_file(): raise HTTPException(status_code=404, detail="Not Found") - def log_generator(): - # 读取文件末尾50行,不使用tailer模块 - with open(log_path, 'r', encoding='utf-8') as f: - for line in f.readlines()[-max(length, 50):]: - yield 'data: %s\n\n' % line - while True: - if global_vars.is_system_stopped: - break - for t in tailer.follow(open(log_path, 'r', encoding='utf-8')): - yield 'data: %s\n\n' % (t or '') - time.sleep(1) + async def log_generator(): + try: + # 使用固定大小的双向队列来限制内存使用 + lines_queue = deque(maxlen=max(length, 50)) + # 使用 aiofiles 异步读取文件 + async with aiofiles.open(log_path, mode="r", encoding="utf-8") as f: + # 逐行读取文件,将每一行存入队列 + file_content = await f.read() + for line in file_content.splitlines(): + lines_queue.append(line) + for line in lines_queue: + yield f"data: {line}\n\n" + # 移动文件指针到文件末尾,继续监听新增内容 + await f.seek(0, 2) + while not global_vars.is_system_stopped: + if await request.is_disconnected(): + break + line = await f.readline() + if not line: + await asyncio.sleep(0.5) + continue + yield f"data: {line}\n\n" + except asyncio.CancelledError: + return # 根据length参数返回不同的响应 if length == -1: # 返回全部日志作为文本响应 if not log_path.exists(): return Response(content="日志文件不存在!", media_type="text/plain") - with open(log_path, 'r', encoding='utf-8') as file: + with open(log_path, "r", encoding='utf-8') as file: text = file.read() # 倒序输出 - text = '\n'.join(text.split('\n')[::-1]) + text = "\n".join(text.split("\n")[::-1]) return Response(content=text, media_type="text/plain") else: # 返回SSE流响应 diff --git a/requirements.in b/requirements.in index 82c50a61..d1497279 100644 --- a/requirements.in +++ b/requirements.in @@ -45,7 +45,6 @@ psutil~=5.9.4 python-dotenv~=1.0.1 python-hosts~=1.0.7 watchdog~=3.0.0 -tailer~=0.4.1 openai~=0.27.2 cacheout~=0.14.1 click~=8.1.6 @@ -61,4 +60,5 @@ Pinyin2Hanzi~=0.1.1 pywebpush~=2.0.0 py115j~=0.0.7 oss2~=2.18.6 -aligo~=6.2.4 \ No newline at end of file +aligo~=6.2.4 +aiofiles~=24.1.0 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5d220da2..a6fde3b9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,8 @@ # # pip-compile requirements.in # +aiofiles==24.1.0 + # via -r requirements.in aiohappyeyeballs==2.4.0 # via aiohttp aiohttp==3.10.5 @@ -63,6 +65,11 @@ click==8.1.7 # uvicorn cn2an==0.5.22 # via -r requirements.in +colorama==0.4.6 + # via + # click + # qrcode + # tqdm coloredlogs==15.0.1 # via aligo crcmod==1.7 @@ -95,7 +102,9 @@ frozenlist==1.4.1 func-timeout==4.3.5 # via -r requirements.in greenlet==2.0.2 - # via playwright + # via + # playwright + # sqlalchemy h11==0.14.0 # via # httpcore @@ -139,7 +148,9 @@ openai==0.27.10 oss2==2.18.6 # via -r requirements.in packaging==24.1 - # via docker + # via + # docker + # qbittorrent-api parse==1.19.1 # via -r requirements.in passlib==1.7.4 @@ -181,14 +192,6 @@ pyee==9.0.4 # via playwright pyjwt==2.7.0 # via -r requirements.in -pyobjc-core==10.3.1 - # via - # pyobjc-framework-cocoa - # pyobjc-framework-quartz -pyobjc-framework-cocoa==10.3.1 - # via pyobjc-framework-quartz -pyobjc-framework-quartz==10.3.1 - # via pystray pyotp==2.9.0 # via -r requirements.in pyparsing==3.0.9 @@ -197,6 +200,8 @@ pypng==0.20220715.0 # via qrcode pyquery==2.0.0 # via -r requirements.in +pyreadline3==3.5.4 + # via humanfriendly pysocks==1.7.1 # via requests pystray==0.19.5 @@ -223,7 +228,7 @@ pyvirtualdisplay==3.0 # via -r requirements.in pywebpush==2.0.0 # via -r requirements.in -qbittorrent-api==2023.5.48 +qbittorrent-api==2024.9.67 # via -r requirements.in qrcode[pil]==7.4.2 # via @@ -262,7 +267,6 @@ six==1.16.0 # pystray # python-dateutil # pywebpush - # qbittorrent-api slack-bolt==1.18.0 # via -r requirements.in slack-sdk==3.21.3 @@ -283,8 +287,6 @@ starlette==0.27.0 # via # -r requirements.in # fastapi -tailer==0.4.1 - # via -r requirements.in torrentool==1.2.0 # via -r requirements.in tqdm==4.66.5 @@ -301,6 +303,8 @@ typing-extensions==4.12.2 # qrcode # sqlalchemy # transmission-rpc +tzdata==2024.2 + # via tzlocal tzlocal==5.2 # via # apscheduler