From f900bcf2ca11b689490f0d185594755cc2dd3e62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=97=B6=E9=9B=A8?= Date: Fri, 15 May 2026 20:49:17 +0800 Subject: [PATCH] feat: switch watch sync from polling to websocket --- api/routers.py | 3 + domain/video_room/__init__.py | 3 + domain/video_room/api.py | 97 ++++++++++++++++++++ domain/video_room/service.py | 142 +++++++++++++++++++++++++++++ domain/video_room/types.py | 48 ++++++++++ models/database.py | 16 ++++ web/src/api/client.ts | 2 + web/src/api/videoRoom.ts | 47 ++++++++++ web/src/pages/PublicWatchPage.tsx | 147 ++++++++++++++++++++++++++++++ web/src/router/index.tsx | 4 +- 10 files changed, 508 insertions(+), 1 deletion(-) create mode 100644 domain/video_room/__init__.py create mode 100644 domain/video_room/api.py create mode 100644 domain/video_room/service.py create mode 100644 domain/video_room/types.py create mode 100644 web/src/api/videoRoom.ts create mode 100644 web/src/pages/PublicWatchPage.tsx diff --git a/api/routers.py b/api/routers.py index fb311a5..7518360 100644 --- a/api/routers.py +++ b/api/routers.py @@ -21,6 +21,7 @@ from domain.permission import api as permission from domain.user import api as user from domain.role import api as role from domain.recent_files import api as recent_files +from domain.video_room import api as video_room def include_routers(app: FastAPI): @@ -48,3 +49,5 @@ def include_routers(app: FastAPI): app.include_router(permission.router) app.include_router(user.router) app.include_router(role.router) + app.include_router(video_room.router) + app.include_router(video_room.public_router) diff --git a/domain/video_room/__init__.py b/domain/video_room/__init__.py new file mode 100644 index 0000000..d70df6d --- /dev/null +++ b/domain/video_room/__init__.py @@ -0,0 +1,3 @@ +from .service import VideoRoomService + +__all__ = ["VideoRoomService"] diff --git a/domain/video_room/api.py b/domain/video_room/api.py new file mode 100644 index 0000000..b847200 --- /dev/null +++ b/domain/video_room/api.py @@ -0,0 +1,97 @@ +from typing import Annotated + +from fastapi import APIRouter, Depends, Request, WebSocket, WebSocketDisconnect + +from api.response import success +from domain.auth import User, get_current_active_user +from domain.video_room.service import VideoRoomService +from domain.video_room.types import PlaybackEvent, VideoRoomCreate, VideoRoomInfo + +router = APIRouter(prefix="/api/video-rooms", tags=["Video Rooms"]) +public_router = APIRouter(prefix="/api/watch", tags=["Video Rooms - Public"]) + + +@router.post("", response_model=VideoRoomInfo) +async def create_video_room( + request: Request, + payload: VideoRoomCreate, + current_user: Annotated[User, Depends(get_current_active_user)], +): + room = await VideoRoomService.create_room( + user_id=current_user.id, + path=payload.path, + name=payload.name, + expires_in_days=payload.expires_in_days, + control_mode=payload.control_mode, + ) + return VideoRoomInfo( + id=room.id, + name=room.name, + token=room.token, + path=room.path, + control_mode=room.control_mode, + created_at=room.created_at.isoformat(), + expires_at=room.expires_at.isoformat() if room.expires_at else None, + ) + + +@public_router.get("/{token}") +async def get_watch_room(request: Request, token: str): + room = await VideoRoomService.get_room_by_token(token) + state = await VideoRoomService.get_state(room.id) + return success({"room": VideoRoomInfo( + id=room.id, + name=room.name, + token=room.token, + path=room.path, + control_mode=room.control_mode, + created_at=room.created_at.isoformat(), + expires_at=room.expires_at.isoformat() if room.expires_at else None, + ).model_dump(), "playback": state}) + + +@public_router.websocket("/{token}/ws") +async def watch_room_ws(websocket: WebSocket, token: str): + room = await VideoRoomService.get_room_by_token(token) + actor = websocket.query_params.get("actor") or "guest" + await VideoRoomService.ws_connect(room.id, websocket) + try: + state = await VideoRoomService.get_state(room.id) + await websocket.send_json({"type": "snapshot", "playback": state}) + while True: + msg = await websocket.receive_json() + if msg.get("type") == "ping": + await websocket.send_json({"type": "pong"}) + continue + event_type = msg.get("event") + if event_type not in {"play", "pause", "seek", "rate"}: + continue + position_ms = int(msg.get("position_ms") or 0) + playback_rate = float(msg.get("playback_rate") or 1.0) + state = await VideoRoomService.apply_event( + room=room, + actor=actor, + event_type=event_type, + position_ms=position_ms, + playback_rate=playback_rate, + ) + await VideoRoomService.ws_broadcast(room.id, {"type": "playback", "event": event_type, "playback": state, "actor": actor}) + except WebSocketDisconnect: + pass + finally: + await VideoRoomService.ws_disconnect(room.id, websocket) + + +@public_router.post("/{token}/events") +async def push_watch_event(request: Request, token: str, payload: PlaybackEvent): + room = await VideoRoomService.get_room_by_token(token) + actor = request.headers.get("X-Watch-Actor", "guest") + state = await VideoRoomService.apply_event( + room=room, + actor=actor, + event_type=payload.type, + position_ms=payload.position_ms, + playback_rate=payload.playback_rate, + ) + await VideoRoomService.ws_broadcast(room.id, {"type": "playback", "event": payload.type, "playback": state, "actor": actor}) + return success({"playback": state}) diff --git a/domain/video_room/service.py b/domain/video_room/service.py new file mode 100644 index 0000000..95a238b --- /dev/null +++ b/domain/video_room/service.py @@ -0,0 +1,142 @@ +import asyncio +import json +import secrets +from datetime import datetime, timedelta, timezone +from typing import Optional + +from fastapi import HTTPException, WebSocket +from starlette.websockets import WebSocketState + +from models.database import VideoRoom + + +VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".m4v"} + + +class VideoRoomService: + _runtime_states: dict[int, dict] = {} + _room_clients: dict[int, set[WebSocket]] = {} + _lock = asyncio.Lock() + + @classmethod + def _now(cls) -> datetime: + return datetime.now(timezone.utc) + + @classmethod + def _iso_now(cls) -> str: + return cls._now().isoformat() + + @classmethod + def _ensure_video_path(cls, path: str) -> None: + p = path.lower() + if not any(p.endswith(ext) for ext in VIDEO_EXTS): + raise HTTPException(status_code=400, detail="仅支持视频文件创建视频间") + + @classmethod + def _calc_expires_at(cls, expires_in_days: Optional[int]) -> Optional[datetime]: + if expires_in_days is None or expires_in_days <= 0: + return None + return cls._now() + timedelta(days=expires_in_days) + + @classmethod + async def create_room(cls, *, user_id: int, path: str, name: Optional[str], expires_in_days: Optional[int], control_mode: str): + cls._ensure_video_path(path) + token = secrets.token_urlsafe(18) + room_name = name or f"{path.split('/')[-1]} 的视频间" + room = await VideoRoom.create( + token=token, + name=room_name, + path=path, + owner_id=user_id, + control_mode=control_mode, + expires_at=cls._calc_expires_at(expires_in_days), + ) + cls._runtime_states[room.id] = { + "position_ms": 0, + "is_paused": True, + "playback_rate": 1.0, + "updated_at": cls._iso_now(), + "updated_by": f"user:{user_id}", + } + return room + + @classmethod + async def get_room_by_token(cls, token: str) -> VideoRoom: + room = await VideoRoom.get_or_none(token=token) + if not room: + raise HTTPException(status_code=404, detail="视频间不存在") + if room.expires_at and room.expires_at < cls._now(): + raise HTTPException(status_code=410, detail="视频间已过期") + return room + + @classmethod + async def get_state(cls, room_id: int) -> dict: + return cls._runtime_states.setdefault( + room_id, + { + "position_ms": 0, + "is_paused": True, + "playback_rate": 1.0, + "updated_at": cls._iso_now(), + "updated_by": "system", + }, + ) + + @classmethod + async def apply_event(cls, *, room: VideoRoom, actor: str, event_type: str, position_ms: int, playback_rate: float): + state = await cls.get_state(room.id) + if room.control_mode == "host_only" and actor != f"user:{room.owner_id}": + raise HTTPException(status_code=403, detail="仅房主可控制播放") + + if event_type == "play": + state["is_paused"] = False + elif event_type == "pause": + state["is_paused"] = True + elif event_type == "seek": + state["position_ms"] = max(position_ms, 0) + elif event_type == "rate": + state["playback_rate"] = playback_rate + state["updated_at"] = cls._iso_now() + state["updated_by"] = actor + return state + + @classmethod + async def ws_connect(cls, room_id: int, websocket: WebSocket): + await websocket.accept() + async with cls._lock: + clients = cls._room_clients.setdefault(room_id, set()) + clients.add(websocket) + + @classmethod + async def ws_disconnect(cls, room_id: int, websocket: WebSocket): + async with cls._lock: + clients = cls._room_clients.get(room_id) + if not clients: + return + clients.discard(websocket) + if not clients: + cls._room_clients.pop(room_id, None) + + @classmethod + async def ws_broadcast(cls, room_id: int, payload: dict): + clients = list(cls._room_clients.get(room_id, set())) + if not clients: + return + text = json.dumps(payload) + stale: list[WebSocket] = [] + for ws in clients: + try: + if ws.application_state == WebSocketState.CONNECTED: + await ws.send_text(text) + else: + stale.append(ws) + except Exception: + stale.append(ws) + + if stale: + async with cls._lock: + pool = cls._room_clients.get(room_id, set()) + for ws in stale: + pool.discard(ws) + if not pool: + cls._room_clients.pop(room_id, None) diff --git a/domain/video_room/types.py b/domain/video_room/types.py new file mode 100644 index 0000000..7c16e45 --- /dev/null +++ b/domain/video_room/types.py @@ -0,0 +1,48 @@ +from typing import Literal, Optional + +from pydantic import BaseModel, Field + + +VideoEventType = Literal["play", "pause", "seek", "rate"] + + +class VideoRoomCreate(BaseModel): + path: str + name: Optional[str] = None + expires_in_days: Optional[int] = 1 + control_mode: Literal["host_only", "everyone"] = "everyone" + + +class VideoRoomJoin(BaseModel): + nickname: Optional[str] = None + + +class VideoRoomInfo(BaseModel): + id: int + name: str + token: str + path: str + control_mode: str + created_at: str + expires_at: Optional[str] = None + + +class PlaybackState(BaseModel): + position_ms: int = 0 + is_paused: bool = True + playback_rate: float = Field(default=1.0, ge=0.25, le=4.0) + updated_at: str + updated_by: str + + +class RoomStateResponse(BaseModel): + room: VideoRoomInfo + playback: PlaybackState + members_online: int + + +class PlaybackEvent(BaseModel): + type: VideoEventType + position_ms: int = 0 + playback_rate: float = Field(default=1.0, ge=0.25, le=4.0) + client_ts: Optional[int] = None diff --git a/models/database.py b/models/database.py index cc4876b..290aaf9 100644 --- a/models/database.py +++ b/models/database.py @@ -291,3 +291,19 @@ class Plugin(Model): class Meta: table = "plugins" + + +class VideoRoom(Model): + id = fields.IntField(pk=True) + token = fields.CharField(max_length=120, unique=True, index=True) + name = fields.CharField(max_length=255) + path = fields.CharField(max_length=4096) + owner: fields.ForeignKeyRelation[UserAccount] = fields.ForeignKeyField( + "models.UserAccount", related_name="video_rooms", on_delete=fields.CASCADE + ) + control_mode = fields.CharField(max_length=20, default="everyone") + created_at = fields.DatetimeField(auto_now_add=True) + expires_at = fields.DatetimeField(null=True) + + class Meta: + table = "video_rooms" diff --git a/web/src/api/client.ts b/web/src/api/client.ts index 6689f52..23a642a 100644 --- a/web/src/api/client.ts +++ b/web/src/api/client.ts @@ -75,3 +75,5 @@ export { adaptersApi, type AdapterItem, type AdapterTypeField, type AdapterTypeM export { shareApi, type ShareInfo, type ShareInfoWithPassword } from './share'; export { offlineDownloadsApi, type OfflineDownloadTask, type OfflineDownloadCreate, type TaskProgress } from './offlineDownloads'; export default request; + +export { videoRoomApi, type VideoRoomInfo, type VideoRoomState } from './videoRoom'; diff --git a/web/src/api/videoRoom.ts b/web/src/api/videoRoom.ts new file mode 100644 index 0000000..96c1d99 --- /dev/null +++ b/web/src/api/videoRoom.ts @@ -0,0 +1,47 @@ +import request, { API_BASE_URL } from './client'; + +export interface VideoRoomInfo { + id: number; + name: string; + token: string; + path: string; + control_mode: 'host_only' | 'everyone'; + created_at: string; + expires_at?: string | null; +} + +export interface VideoPlaybackState { + position_ms: number; + is_paused: boolean; + playback_rate: number; + updated_at: string; + updated_by: string; +} + +export interface VideoRoomState { + room: VideoRoomInfo; + playback: VideoPlaybackState; +} + +export interface VideoRoomCreatePayload { + path: string; + name?: string; + expires_in_days?: number; + control_mode?: 'host_only' | 'everyone'; +} + +export const videoRoomApi = { + create: (payload: VideoRoomCreatePayload) => request('/video-rooms', { method: 'POST', json: payload }), + getState: (token: string) => request(`/watch/${token}`), + pushEvent: (token: string, payload: { type: 'play' | 'pause' | 'seek' | 'rate'; position_ms?: number; playback_rate?: number }, actorId?: string) => + request<{ playback: VideoPlaybackState }>(`/watch/${token}/events`, { + method: 'POST', + json: payload, + headers: actorId ? { 'X-Watch-Actor': actorId } : undefined, + }), + streamUrl: (token: string, path: string) => `${API_BASE_URL}/s/${token}/download?path=${encodeURIComponent(path)}`, + connectWs: (token: string, actorId: string) => { + const proto = window.location.protocol === 'https:' ? 'wss' : 'ws'; + return new WebSocket(`${proto}://${window.location.host}/api/watch/${token}/ws?actor=${encodeURIComponent(actorId)}`); + }, +}; diff --git a/web/src/pages/PublicWatchPage.tsx b/web/src/pages/PublicWatchPage.tsx new file mode 100644 index 0000000..e0c2000 --- /dev/null +++ b/web/src/pages/PublicWatchPage.tsx @@ -0,0 +1,147 @@ +import { useEffect, useMemo, useRef, useState } from 'react'; +import { useParams } from 'react-router'; +import { Alert, Button, Card, Empty, Input, Space, Spin, Typography, message } from 'antd'; +import { videoRoomApi, type VideoRoomState } from '../api/videoRoom'; + +const { Title, Text } = Typography; + +export default function PublicWatchPage() { + const { token } = useParams(); + const [data, setData] = useState(null); + const [loading, setLoading] = useState(true); + const [err, setErr] = useState(''); + const [wsConnected, setWsConnected] = useState(false); + const videoRef = useRef(null); + const syncingRef = useRef(false); + const wsRef = useRef(null); + + const actorId = useMemo(() => { + const key = 'watch_actor_id'; + const cached = localStorage.getItem(key); + if (cached) return cached; + const v = `guest:${Math.random().toString(36).slice(2, 10)}`; + localStorage.setItem(key, v); + return v; + }, []); + + useEffect(() => { + if (!token) return; + const load = async () => { + try { + const res = await videoRoomApi.getState(token); + setData(res); + setErr(''); + } catch (e: any) { + setErr(e.message || '加载视频间失败'); + } finally { + setLoading(false); + } + }; + void load(); + }, [token]); + + useEffect(() => { + if (!token) return; + let closedByCleanup = false; + let reconnectTimer: number | null = null; + + const connect = () => { + const ws = videoRoomApi.connectWs(token, actorId); + wsRef.current = ws; + ws.onopen = () => setWsConnected(true); + ws.onmessage = (evt) => { + try { + const msg = JSON.parse(evt.data); + if (msg.type === 'snapshot' || msg.type === 'playback') { + setData((prev) => { + if (!prev) return prev; + return { ...prev, playback: msg.playback }; + }); + } + } catch { + void 0; + } + }; + ws.onclose = () => { + setWsConnected(false); + if (!closedByCleanup) { + reconnectTimer = window.setTimeout(connect, 1500); + } + }; + ws.onerror = () => { + setWsConnected(false); + }; + }; + + connect(); + + return () => { + closedByCleanup = true; + setWsConnected(false); + if (reconnectTimer) window.clearTimeout(reconnectTimer); + wsRef.current?.close(); + wsRef.current = null; + }; + }, [token, actorId]); + + useEffect(() => { + const video = videoRef.current; + const pb = data?.playback; + if (!video || !pb) return; + + syncingRef.current = true; + const targetSec = (pb.position_ms || 0) / 1000; + if (Math.abs(video.currentTime - targetSec) > 1.2) video.currentTime = targetSec; + if (Math.abs(video.playbackRate - pb.playback_rate) > 0.01) video.playbackRate = pb.playback_rate; + if (pb.is_paused && !video.paused) video.pause(); + if (!pb.is_paused && video.paused) void video.play().catch(() => void 0); + setTimeout(() => { syncingRef.current = false; }, 120); + }, [data?.playback?.updated_at]); + + const sendEvent = (payload: { event: 'play' | 'pause' | 'seek' | 'rate'; position_ms?: number; playback_rate?: number }) => { + const ws = wsRef.current; + if (!ws || ws.readyState !== WebSocket.OPEN) return; + ws.send(JSON.stringify(payload)); + }; + + if (loading) return
; + if (err || !data) return
; + + return ( +
+ + + {data.room.name} + 同步状态:{data.playback.is_paused ? '暂停' : '播放中'} | 倍速 {data.playback.playback_rate}x + {wsConnected ? '实时同步已连接' : '实时同步断开,正在重连…'} + + + + + + +
+ ); +} diff --git a/web/src/router/index.tsx b/web/src/router/index.tsx index 5369c45..9565915 100644 --- a/web/src/router/index.tsx +++ b/web/src/router/index.tsx @@ -7,6 +7,7 @@ import SetupPage from '../pages/SetupPage.tsx'; import PublicSharePage from '../pages/PublicSharePage'; import ForgotPasswordPage from '../pages/ForgotPasswordPage'; import ResetPasswordPage from '../pages/ResetPasswordPage'; +import PublicWatchPage from '../pages/PublicWatchPage'; import { useAuth } from '../contexts/AuthContext'; import type { JSX } from 'react'; @@ -16,6 +17,7 @@ export const routes: RouteObject[] = [ { path: '/login', element: }, { path: '/register', element: }, { path: '/share/:token', element: }, + { path: '/watch/:token', element: }, { path: '/setup', element: }, { path: '/forgot-password', element: }, { path: '/reset-password', element: }, @@ -26,7 +28,7 @@ function RequireAuth({ children }: { children: JSX.Element }) { const location = useLocation(); const publicPaths = ['/login', '/register', '/forgot-password', '/reset-password']; const isPublic = publicPaths.some((p) => location.pathname.startsWith(p)); - if (!isAuthenticated && !location.pathname.startsWith('/share/') && !isPublic) { + if (!isAuthenticated && !location.pathname.startsWith('/share/') && !location.pathname.startsWith('/watch/') && !isPublic) { return ; } return children;