feat: switch watch sync from polling to websocket

This commit is contained in:
时雨
2026-05-15 20:49:17 +08:00
parent d5a24c69e1
commit f900bcf2ca
10 changed files with 508 additions and 1 deletions

View File

@@ -0,0 +1,3 @@
from .service import VideoRoomService
__all__ = ["VideoRoomService"]

97
domain/video_room/api.py Normal file
View File

@@ -0,0 +1,97 @@
from typing import Annotated
from fastapi import APIRouter, Depends, Request, WebSocket, WebSocketDisconnect
from api.response import success
from domain.auth import User, get_current_active_user
from domain.video_room.service import VideoRoomService
from domain.video_room.types import PlaybackEvent, VideoRoomCreate, VideoRoomInfo
router = APIRouter(prefix="/api/video-rooms", tags=["Video Rooms"])
public_router = APIRouter(prefix="/api/watch", tags=["Video Rooms - Public"])
@router.post("", response_model=VideoRoomInfo)
async def create_video_room(
request: Request,
payload: VideoRoomCreate,
current_user: Annotated[User, Depends(get_current_active_user)],
):
room = await VideoRoomService.create_room(
user_id=current_user.id,
path=payload.path,
name=payload.name,
expires_in_days=payload.expires_in_days,
control_mode=payload.control_mode,
)
return VideoRoomInfo(
id=room.id,
name=room.name,
token=room.token,
path=room.path,
control_mode=room.control_mode,
created_at=room.created_at.isoformat(),
expires_at=room.expires_at.isoformat() if room.expires_at else None,
)
@public_router.get("/{token}")
async def get_watch_room(request: Request, token: str):
room = await VideoRoomService.get_room_by_token(token)
state = await VideoRoomService.get_state(room.id)
return success({"room": VideoRoomInfo(
id=room.id,
name=room.name,
token=room.token,
path=room.path,
control_mode=room.control_mode,
created_at=room.created_at.isoformat(),
expires_at=room.expires_at.isoformat() if room.expires_at else None,
).model_dump(), "playback": state})
@public_router.websocket("/{token}/ws")
async def watch_room_ws(websocket: WebSocket, token: str):
room = await VideoRoomService.get_room_by_token(token)
actor = websocket.query_params.get("actor") or "guest"
await VideoRoomService.ws_connect(room.id, websocket)
try:
state = await VideoRoomService.get_state(room.id)
await websocket.send_json({"type": "snapshot", "playback": state})
while True:
msg = await websocket.receive_json()
if msg.get("type") == "ping":
await websocket.send_json({"type": "pong"})
continue
event_type = msg.get("event")
if event_type not in {"play", "pause", "seek", "rate"}:
continue
position_ms = int(msg.get("position_ms") or 0)
playback_rate = float(msg.get("playback_rate") or 1.0)
state = await VideoRoomService.apply_event(
room=room,
actor=actor,
event_type=event_type,
position_ms=position_ms,
playback_rate=playback_rate,
)
await VideoRoomService.ws_broadcast(room.id, {"type": "playback", "event": event_type, "playback": state, "actor": actor})
except WebSocketDisconnect:
pass
finally:
await VideoRoomService.ws_disconnect(room.id, websocket)
@public_router.post("/{token}/events")
async def push_watch_event(request: Request, token: str, payload: PlaybackEvent):
room = await VideoRoomService.get_room_by_token(token)
actor = request.headers.get("X-Watch-Actor", "guest")
state = await VideoRoomService.apply_event(
room=room,
actor=actor,
event_type=payload.type,
position_ms=payload.position_ms,
playback_rate=payload.playback_rate,
)
await VideoRoomService.ws_broadcast(room.id, {"type": "playback", "event": payload.type, "playback": state, "actor": actor})
return success({"playback": state})

View File

@@ -0,0 +1,142 @@
import asyncio
import json
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import HTTPException, WebSocket
from starlette.websockets import WebSocketState
from models.database import VideoRoom
VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".m4v"}
class VideoRoomService:
_runtime_states: dict[int, dict] = {}
_room_clients: dict[int, set[WebSocket]] = {}
_lock = asyncio.Lock()
@classmethod
def _now(cls) -> datetime:
return datetime.now(timezone.utc)
@classmethod
def _iso_now(cls) -> str:
return cls._now().isoformat()
@classmethod
def _ensure_video_path(cls, path: str) -> None:
p = path.lower()
if not any(p.endswith(ext) for ext in VIDEO_EXTS):
raise HTTPException(status_code=400, detail="仅支持视频文件创建视频间")
@classmethod
def _calc_expires_at(cls, expires_in_days: Optional[int]) -> Optional[datetime]:
if expires_in_days is None or expires_in_days <= 0:
return None
return cls._now() + timedelta(days=expires_in_days)
@classmethod
async def create_room(cls, *, user_id: int, path: str, name: Optional[str], expires_in_days: Optional[int], control_mode: str):
cls._ensure_video_path(path)
token = secrets.token_urlsafe(18)
room_name = name or f"{path.split('/')[-1]} 的视频间"
room = await VideoRoom.create(
token=token,
name=room_name,
path=path,
owner_id=user_id,
control_mode=control_mode,
expires_at=cls._calc_expires_at(expires_in_days),
)
cls._runtime_states[room.id] = {
"position_ms": 0,
"is_paused": True,
"playback_rate": 1.0,
"updated_at": cls._iso_now(),
"updated_by": f"user:{user_id}",
}
return room
@classmethod
async def get_room_by_token(cls, token: str) -> VideoRoom:
room = await VideoRoom.get_or_none(token=token)
if not room:
raise HTTPException(status_code=404, detail="视频间不存在")
if room.expires_at and room.expires_at < cls._now():
raise HTTPException(status_code=410, detail="视频间已过期")
return room
@classmethod
async def get_state(cls, room_id: int) -> dict:
return cls._runtime_states.setdefault(
room_id,
{
"position_ms": 0,
"is_paused": True,
"playback_rate": 1.0,
"updated_at": cls._iso_now(),
"updated_by": "system",
},
)
@classmethod
async def apply_event(cls, *, room: VideoRoom, actor: str, event_type: str, position_ms: int, playback_rate: float):
state = await cls.get_state(room.id)
if room.control_mode == "host_only" and actor != f"user:{room.owner_id}":
raise HTTPException(status_code=403, detail="仅房主可控制播放")
if event_type == "play":
state["is_paused"] = False
elif event_type == "pause":
state["is_paused"] = True
elif event_type == "seek":
state["position_ms"] = max(position_ms, 0)
elif event_type == "rate":
state["playback_rate"] = playback_rate
state["updated_at"] = cls._iso_now()
state["updated_by"] = actor
return state
@classmethod
async def ws_connect(cls, room_id: int, websocket: WebSocket):
await websocket.accept()
async with cls._lock:
clients = cls._room_clients.setdefault(room_id, set())
clients.add(websocket)
@classmethod
async def ws_disconnect(cls, room_id: int, websocket: WebSocket):
async with cls._lock:
clients = cls._room_clients.get(room_id)
if not clients:
return
clients.discard(websocket)
if not clients:
cls._room_clients.pop(room_id, None)
@classmethod
async def ws_broadcast(cls, room_id: int, payload: dict):
clients = list(cls._room_clients.get(room_id, set()))
if not clients:
return
text = json.dumps(payload)
stale: list[WebSocket] = []
for ws in clients:
try:
if ws.application_state == WebSocketState.CONNECTED:
await ws.send_text(text)
else:
stale.append(ws)
except Exception:
stale.append(ws)
if stale:
async with cls._lock:
pool = cls._room_clients.get(room_id, set())
for ws in stale:
pool.discard(ws)
if not pool:
cls._room_clients.pop(room_id, None)

View File

@@ -0,0 +1,48 @@
from typing import Literal, Optional
from pydantic import BaseModel, Field
VideoEventType = Literal["play", "pause", "seek", "rate"]
class VideoRoomCreate(BaseModel):
path: str
name: Optional[str] = None
expires_in_days: Optional[int] = 1
control_mode: Literal["host_only", "everyone"] = "everyone"
class VideoRoomJoin(BaseModel):
nickname: Optional[str] = None
class VideoRoomInfo(BaseModel):
id: int
name: str
token: str
path: str
control_mode: str
created_at: str
expires_at: Optional[str] = None
class PlaybackState(BaseModel):
position_ms: int = 0
is_paused: bool = True
playback_rate: float = Field(default=1.0, ge=0.25, le=4.0)
updated_at: str
updated_by: str
class RoomStateResponse(BaseModel):
room: VideoRoomInfo
playback: PlaybackState
members_online: int
class PlaybackEvent(BaseModel):
type: VideoEventType
position_ms: int = 0
playback_rate: float = Field(default=1.0, ge=0.25, le=4.0)
client_ts: Optional[int] = None