refactor(lifecycle): add async support for SSE

This commit is contained in:
InfinityPacer
2024-10-18 00:39:35 +08:00
parent 348923aaa6
commit 4c2bb99b59
3 changed files with 74 additions and 50 deletions

View File

@@ -1,14 +1,15 @@
import asyncio
import io
import json
import tempfile
import time
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Optional, Union
import tailer
import aiofiles
from PIL import Image
from fastapi import APIRouter, Depends, HTTPException, Header, Response
from fastapi import APIRouter, Depends, HTTPException, Header, Request, Response
from fastapi.responses import StreamingResponse
from app import schemas
@@ -224,19 +225,22 @@ def set_env_setting(env: dict,
@router.get("/progress/{process_type}", summary="实时进度")
def get_progress(process_type: str, _: schemas.TokenPayload = Depends(verify_resource_token)):
async def get_progress(request: Request, process_type: str, _: schemas.TokenPayload = Depends(verify_resource_token)):
"""
实时获取处理进度返回格式为SSE
"""
progress = ProgressHelper()
def event_generator():
while True:
if global_vars.is_system_stopped:
break
detail = progress.get(process_type)
yield 'data: %s\n\n' % json.dumps(detail)
time.sleep(0.2)
async def event_generator():
try:
while not global_vars.is_system_stopped:
if await request.is_disconnected():
break
detail = progress.get(process_type)
yield f"data: {json.dumps(detail)}\n\n"
await asyncio.sleep(0.2)
except asyncio.CancelledError:
return
return StreamingResponse(event_generator(), media_type="text/event-stream")
@@ -273,26 +277,29 @@ def set_setting(key: str, value: Union[list, dict, bool, int, str] = None,
@router.get("/message", summary="实时消息")
def get_message(role: str = "system", _: schemas.TokenPayload = Depends(verify_resource_token)):
async def get_message(request: Request, role: str = "system", _: schemas.TokenPayload = Depends(verify_resource_token)):
"""
实时获取系统消息返回格式为SSE
"""
message = MessageHelper()
def event_generator():
while True:
if global_vars.is_system_stopped:
break
detail = message.get(role)
yield 'data: %s\n\n' % (detail or '')
time.sleep(3)
async def event_generator():
try:
while not global_vars.is_system_stopped:
if await request.is_disconnected():
break
detail = message.get(role)
yield f"data: {detail or ''}\n\n"
await asyncio.sleep(3)
except asyncio.CancelledError:
return
return StreamingResponse(event_generator(), media_type="text/event-stream")
@router.get("/logging", summary="实时日志")
def get_logging(length: int = 50, logfile: str = "moviepilot.log",
_: schemas.TokenPayload = Depends(verify_resource_token)):
async def get_logging(request: Request, length: int = 50, logfile: str = "moviepilot.log",
_: schemas.TokenPayload = Depends(verify_resource_token)):
"""
实时获取系统日志
length = -1 时, 返回text/plain
@@ -306,27 +313,40 @@ def get_logging(length: int = 50, logfile: str = "moviepilot.log",
if not log_path.exists() or not log_path.is_file():
raise HTTPException(status_code=404, detail="Not Found")
def log_generator():
# 读取文件末尾50行不使用tailer模块
with open(log_path, 'r', encoding='utf-8') as f:
for line in f.readlines()[-max(length, 50):]:
yield 'data: %s\n\n' % line
while True:
if global_vars.is_system_stopped:
break
for t in tailer.follow(open(log_path, 'r', encoding='utf-8')):
yield 'data: %s\n\n' % (t or '')
time.sleep(1)
async def log_generator():
try:
# 使用固定大小的双向队列来限制内存使用
lines_queue = deque(maxlen=max(length, 50))
# 使用 aiofiles 异步读取文件
async with aiofiles.open(log_path, mode="r", encoding="utf-8") as f:
# 逐行读取文件,将每一行存入队列
file_content = await f.read()
for line in file_content.splitlines():
lines_queue.append(line)
for line in lines_queue:
yield f"data: {line}\n\n"
# 移动文件指针到文件末尾,继续监听新增内容
await f.seek(0, 2)
while not global_vars.is_system_stopped:
if await request.is_disconnected():
break
line = await f.readline()
if not line:
await asyncio.sleep(0.5)
continue
yield f"data: {line}\n\n"
except asyncio.CancelledError:
return
# 根据length参数返回不同的响应
if length == -1:
# 返回全部日志作为文本响应
if not log_path.exists():
return Response(content="日志文件不存在!", media_type="text/plain")
with open(log_path, 'r', encoding='utf-8') as file:
with open(log_path, "r", encoding='utf-8') as file:
text = file.read()
# 倒序输出
text = '\n'.join(text.split('\n')[::-1])
text = "\n".join(text.split("\n")[::-1])
return Response(content=text, media_type="text/plain")
else:
# 返回SSE流响应

View File

@@ -45,7 +45,6 @@ psutil~=5.9.4
python-dotenv~=1.0.1
python-hosts~=1.0.7
watchdog~=3.0.0
tailer~=0.4.1
openai~=0.27.2
cacheout~=0.14.1
click~=8.1.6
@@ -61,4 +60,5 @@ Pinyin2Hanzi~=0.1.1
pywebpush~=2.0.0
py115j~=0.0.7
oss2~=2.18.6
aligo~=6.2.4
aligo~=6.2.4
aiofiles~=24.1.0

View File

@@ -4,6 +4,8 @@
#
# pip-compile requirements.in
#
aiofiles==24.1.0
# via -r requirements.in
aiohappyeyeballs==2.4.0
# via aiohttp
aiohttp==3.10.5
@@ -63,6 +65,11 @@ click==8.1.7
# uvicorn
cn2an==0.5.22
# via -r requirements.in
colorama==0.4.6
# via
# click
# qrcode
# tqdm
coloredlogs==15.0.1
# via aligo
crcmod==1.7
@@ -95,7 +102,9 @@ frozenlist==1.4.1
func-timeout==4.3.5
# via -r requirements.in
greenlet==2.0.2
# via playwright
# via
# playwright
# sqlalchemy
h11==0.14.0
# via
# httpcore
@@ -139,7 +148,9 @@ openai==0.27.10
oss2==2.18.6
# via -r requirements.in
packaging==24.1
# via docker
# via
# docker
# qbittorrent-api
parse==1.19.1
# via -r requirements.in
passlib==1.7.4
@@ -181,14 +192,6 @@ pyee==9.0.4
# via playwright
pyjwt==2.7.0
# via -r requirements.in
pyobjc-core==10.3.1
# via
# pyobjc-framework-cocoa
# pyobjc-framework-quartz
pyobjc-framework-cocoa==10.3.1
# via pyobjc-framework-quartz
pyobjc-framework-quartz==10.3.1
# via pystray
pyotp==2.9.0
# via -r requirements.in
pyparsing==3.0.9
@@ -197,6 +200,8 @@ pypng==0.20220715.0
# via qrcode
pyquery==2.0.0
# via -r requirements.in
pyreadline3==3.5.4
# via humanfriendly
pysocks==1.7.1
# via requests
pystray==0.19.5
@@ -223,7 +228,7 @@ pyvirtualdisplay==3.0
# via -r requirements.in
pywebpush==2.0.0
# via -r requirements.in
qbittorrent-api==2023.5.48
qbittorrent-api==2024.9.67
# via -r requirements.in
qrcode[pil]==7.4.2
# via
@@ -262,7 +267,6 @@ six==1.16.0
# pystray
# python-dateutil
# pywebpush
# qbittorrent-api
slack-bolt==1.18.0
# via -r requirements.in
slack-sdk==3.21.3
@@ -283,8 +287,6 @@ starlette==0.27.0
# via
# -r requirements.in
# fastapi
tailer==0.4.1
# via -r requirements.in
torrentool==1.2.0
# via -r requirements.in
tqdm==4.66.5
@@ -301,6 +303,8 @@ typing-extensions==4.12.2
# qrcode
# sqlalchemy
# transmission-rpc
tzdata==2024.2
# via tzlocal
tzlocal==5.2
# via
# apscheduler