Compare commits

...

2 Commits

Author SHA1 Message Date
时雨
f900bcf2ca feat: switch watch sync from polling to websocket 2026-05-15 20:49:17 +08:00
dependabot[bot]
d5a24c69e1 chore(deps): bump python-multipart in the uv group across 1 directory (#121)
Bumps the uv group with 1 update in the / directory: [python-multipart](https://github.com/Kludex/python-multipart).


Updates `python-multipart` from 0.0.26 to 0.0.27
- [Release notes](https://github.com/Kludex/python-multipart/releases)
- [Changelog](https://github.com/Kludex/python-multipart/blob/main/CHANGELOG.md)
- [Commits](https://github.com/Kludex/python-multipart/compare/0.0.26...0.0.27)

---
updated-dependencies:
- dependency-name: python-multipart
  dependency-version: 0.0.27
  dependency-type: direct:production
  dependency-group: uv
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-05-10 22:12:43 +08:00
12 changed files with 513 additions and 6 deletions

View File

@@ -21,6 +21,7 @@ from domain.permission import api as permission
from domain.user import api as user from domain.user import api as user
from domain.role import api as role from domain.role import api as role
from domain.recent_files import api as recent_files from domain.recent_files import api as recent_files
from domain.video_room import api as video_room
def include_routers(app: FastAPI): def include_routers(app: FastAPI):
@@ -48,3 +49,5 @@ def include_routers(app: FastAPI):
app.include_router(permission.router) app.include_router(permission.router)
app.include_router(user.router) app.include_router(user.router)
app.include_router(role.router) app.include_router(role.router)
app.include_router(video_room.router)
app.include_router(video_room.public_router)

View File

@@ -0,0 +1,3 @@
from .service import VideoRoomService
__all__ = ["VideoRoomService"]

97
domain/video_room/api.py Normal file
View File

@@ -0,0 +1,97 @@
from typing import Annotated
from fastapi import APIRouter, Depends, Request, WebSocket, WebSocketDisconnect
from api.response import success
from domain.auth import User, get_current_active_user
from domain.video_room.service import VideoRoomService
from domain.video_room.types import PlaybackEvent, VideoRoomCreate, VideoRoomInfo
router = APIRouter(prefix="/api/video-rooms", tags=["Video Rooms"])
public_router = APIRouter(prefix="/api/watch", tags=["Video Rooms - Public"])
@router.post("", response_model=VideoRoomInfo)
async def create_video_room(
request: Request,
payload: VideoRoomCreate,
current_user: Annotated[User, Depends(get_current_active_user)],
):
room = await VideoRoomService.create_room(
user_id=current_user.id,
path=payload.path,
name=payload.name,
expires_in_days=payload.expires_in_days,
control_mode=payload.control_mode,
)
return VideoRoomInfo(
id=room.id,
name=room.name,
token=room.token,
path=room.path,
control_mode=room.control_mode,
created_at=room.created_at.isoformat(),
expires_at=room.expires_at.isoformat() if room.expires_at else None,
)
@public_router.get("/{token}")
async def get_watch_room(request: Request, token: str):
room = await VideoRoomService.get_room_by_token(token)
state = await VideoRoomService.get_state(room.id)
return success({"room": VideoRoomInfo(
id=room.id,
name=room.name,
token=room.token,
path=room.path,
control_mode=room.control_mode,
created_at=room.created_at.isoformat(),
expires_at=room.expires_at.isoformat() if room.expires_at else None,
).model_dump(), "playback": state})
@public_router.websocket("/{token}/ws")
async def watch_room_ws(websocket: WebSocket, token: str):
room = await VideoRoomService.get_room_by_token(token)
actor = websocket.query_params.get("actor") or "guest"
await VideoRoomService.ws_connect(room.id, websocket)
try:
state = await VideoRoomService.get_state(room.id)
await websocket.send_json({"type": "snapshot", "playback": state})
while True:
msg = await websocket.receive_json()
if msg.get("type") == "ping":
await websocket.send_json({"type": "pong"})
continue
event_type = msg.get("event")
if event_type not in {"play", "pause", "seek", "rate"}:
continue
position_ms = int(msg.get("position_ms") or 0)
playback_rate = float(msg.get("playback_rate") or 1.0)
state = await VideoRoomService.apply_event(
room=room,
actor=actor,
event_type=event_type,
position_ms=position_ms,
playback_rate=playback_rate,
)
await VideoRoomService.ws_broadcast(room.id, {"type": "playback", "event": event_type, "playback": state, "actor": actor})
except WebSocketDisconnect:
pass
finally:
await VideoRoomService.ws_disconnect(room.id, websocket)
@public_router.post("/{token}/events")
async def push_watch_event(request: Request, token: str, payload: PlaybackEvent):
room = await VideoRoomService.get_room_by_token(token)
actor = request.headers.get("X-Watch-Actor", "guest")
state = await VideoRoomService.apply_event(
room=room,
actor=actor,
event_type=payload.type,
position_ms=payload.position_ms,
playback_rate=payload.playback_rate,
)
await VideoRoomService.ws_broadcast(room.id, {"type": "playback", "event": payload.type, "playback": state, "actor": actor})
return success({"playback": state})

View File

@@ -0,0 +1,142 @@
import asyncio
import json
import secrets
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import HTTPException, WebSocket
from starlette.websockets import WebSocketState
from models.database import VideoRoom
VIDEO_EXTS = {".mp4", ".mkv", ".avi", ".mov", ".webm", ".m4v"}
class VideoRoomService:
_runtime_states: dict[int, dict] = {}
_room_clients: dict[int, set[WebSocket]] = {}
_lock = asyncio.Lock()
@classmethod
def _now(cls) -> datetime:
return datetime.now(timezone.utc)
@classmethod
def _iso_now(cls) -> str:
return cls._now().isoformat()
@classmethod
def _ensure_video_path(cls, path: str) -> None:
p = path.lower()
if not any(p.endswith(ext) for ext in VIDEO_EXTS):
raise HTTPException(status_code=400, detail="仅支持视频文件创建视频间")
@classmethod
def _calc_expires_at(cls, expires_in_days: Optional[int]) -> Optional[datetime]:
if expires_in_days is None or expires_in_days <= 0:
return None
return cls._now() + timedelta(days=expires_in_days)
@classmethod
async def create_room(cls, *, user_id: int, path: str, name: Optional[str], expires_in_days: Optional[int], control_mode: str):
cls._ensure_video_path(path)
token = secrets.token_urlsafe(18)
room_name = name or f"{path.split('/')[-1]} 的视频间"
room = await VideoRoom.create(
token=token,
name=room_name,
path=path,
owner_id=user_id,
control_mode=control_mode,
expires_at=cls._calc_expires_at(expires_in_days),
)
cls._runtime_states[room.id] = {
"position_ms": 0,
"is_paused": True,
"playback_rate": 1.0,
"updated_at": cls._iso_now(),
"updated_by": f"user:{user_id}",
}
return room
@classmethod
async def get_room_by_token(cls, token: str) -> VideoRoom:
room = await VideoRoom.get_or_none(token=token)
if not room:
raise HTTPException(status_code=404, detail="视频间不存在")
if room.expires_at and room.expires_at < cls._now():
raise HTTPException(status_code=410, detail="视频间已过期")
return room
@classmethod
async def get_state(cls, room_id: int) -> dict:
return cls._runtime_states.setdefault(
room_id,
{
"position_ms": 0,
"is_paused": True,
"playback_rate": 1.0,
"updated_at": cls._iso_now(),
"updated_by": "system",
},
)
@classmethod
async def apply_event(cls, *, room: VideoRoom, actor: str, event_type: str, position_ms: int, playback_rate: float):
state = await cls.get_state(room.id)
if room.control_mode == "host_only" and actor != f"user:{room.owner_id}":
raise HTTPException(status_code=403, detail="仅房主可控制播放")
if event_type == "play":
state["is_paused"] = False
elif event_type == "pause":
state["is_paused"] = True
elif event_type == "seek":
state["position_ms"] = max(position_ms, 0)
elif event_type == "rate":
state["playback_rate"] = playback_rate
state["updated_at"] = cls._iso_now()
state["updated_by"] = actor
return state
@classmethod
async def ws_connect(cls, room_id: int, websocket: WebSocket):
await websocket.accept()
async with cls._lock:
clients = cls._room_clients.setdefault(room_id, set())
clients.add(websocket)
@classmethod
async def ws_disconnect(cls, room_id: int, websocket: WebSocket):
async with cls._lock:
clients = cls._room_clients.get(room_id)
if not clients:
return
clients.discard(websocket)
if not clients:
cls._room_clients.pop(room_id, None)
@classmethod
async def ws_broadcast(cls, room_id: int, payload: dict):
clients = list(cls._room_clients.get(room_id, set()))
if not clients:
return
text = json.dumps(payload)
stale: list[WebSocket] = []
for ws in clients:
try:
if ws.application_state == WebSocketState.CONNECTED:
await ws.send_text(text)
else:
stale.append(ws)
except Exception:
stale.append(ws)
if stale:
async with cls._lock:
pool = cls._room_clients.get(room_id, set())
for ws in stale:
pool.discard(ws)
if not pool:
cls._room_clients.pop(room_id, None)

View File

@@ -0,0 +1,48 @@
from typing import Literal, Optional
from pydantic import BaseModel, Field
VideoEventType = Literal["play", "pause", "seek", "rate"]
class VideoRoomCreate(BaseModel):
path: str
name: Optional[str] = None
expires_in_days: Optional[int] = 1
control_mode: Literal["host_only", "everyone"] = "everyone"
class VideoRoomJoin(BaseModel):
nickname: Optional[str] = None
class VideoRoomInfo(BaseModel):
id: int
name: str
token: str
path: str
control_mode: str
created_at: str
expires_at: Optional[str] = None
class PlaybackState(BaseModel):
position_ms: int = 0
is_paused: bool = True
playback_rate: float = Field(default=1.0, ge=0.25, le=4.0)
updated_at: str
updated_by: str
class RoomStateResponse(BaseModel):
room: VideoRoomInfo
playback: PlaybackState
members_online: int
class PlaybackEvent(BaseModel):
type: VideoEventType
position_ms: int = 0
playback_rate: float = Field(default=1.0, ge=0.25, le=4.0)
client_ts: Optional[int] = None

View File

@@ -291,3 +291,19 @@ class Plugin(Model):
class Meta: class Meta:
table = "plugins" table = "plugins"
class VideoRoom(Model):
id = fields.IntField(pk=True)
token = fields.CharField(max_length=120, unique=True, index=True)
name = fields.CharField(max_length=255)
path = fields.CharField(max_length=4096)
owner: fields.ForeignKeyRelation[UserAccount] = fields.ForeignKeyField(
"models.UserAccount", related_name="video_rooms", on_delete=fields.CASCADE
)
control_mode = fields.CharField(max_length=20, default="everyone")
created_at = fields.DatetimeField(auto_now_add=True)
expires_at = fields.DatetimeField(null=True)
class Meta:
table = "video_rooms"

View File

@@ -17,7 +17,7 @@ dependencies = [
"pymilvus[milvus-lite]>=2.6.5", "pymilvus[milvus-lite]>=2.6.5",
"pysocks>=1.7.1", "pysocks>=1.7.1",
"python-dotenv>=1.2.2", "python-dotenv>=1.2.2",
"python-multipart>=0.0.26", "python-multipart>=0.0.27",
"qdrant-client>=1.16.2", "qdrant-client>=1.16.2",
"setuptools<82", "setuptools<82",
"telethon>=1.42.0", "telethon>=1.42.0",

8
uv.lock generated
View File

@@ -475,7 +475,7 @@ requires-dist = [
{ name = "pymilvus", extras = ["milvus-lite"], specifier = ">=2.6.5" }, { name = "pymilvus", extras = ["milvus-lite"], specifier = ">=2.6.5" },
{ name = "pysocks", specifier = ">=1.7.1" }, { name = "pysocks", specifier = ">=1.7.1" },
{ name = "python-dotenv", specifier = ">=1.2.2" }, { name = "python-dotenv", specifier = ">=1.2.2" },
{ name = "python-multipart", specifier = ">=0.0.26" }, { name = "python-multipart", specifier = ">=0.0.27" },
{ name = "qdrant-client", specifier = ">=1.16.2" }, { name = "qdrant-client", specifier = ">=1.16.2" },
{ name = "setuptools", specifier = "<82" }, { name = "setuptools", specifier = "<82" },
{ name = "telethon", specifier = ">=1.42.0" }, { name = "telethon", specifier = ">=1.42.0" },
@@ -1179,11 +1179,11 @@ wheels = [
[[package]] [[package]]
name = "python-multipart" name = "python-multipart"
version = "0.0.26" version = "0.0.27"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/88/71/b145a380824a960ebd60e1014256dbb7d2253f2316ff2d73dfd8928ec2c3/python_multipart-0.0.26.tar.gz", hash = "sha256:08fadc45918cd615e26846437f50c5d6d23304da32c341f289a617127b081f17", size = 43501, upload-time = "2026-04-10T14:09:59.473Z" } sdist = { url = "https://files.pythonhosted.org/packages/69/9b/f23807317a113dc36e74e75eb265a02dd1a4d9082abc3c1064acd22997c4/python_multipart-0.0.27.tar.gz", hash = "sha256:9870a6a8c5a20a5bf4f07c017bd1489006ff8836cff097b6933355ee2b49b602", size = 44043, upload-time = "2026-04-27T10:51:26.649Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/9a/22/f1925cdda983ab66fc8ec6ec8014b959262747e58bdca26a4e3d1da29d56/python_multipart-0.0.26-py3-none-any.whl", hash = "sha256:c0b169f8c4484c13b0dcf2ef0ec3a4adb255c4b7d18d8e420477d2b1dd03f185", size = 28847, upload-time = "2026-04-10T14:09:58.131Z" }, { url = "https://files.pythonhosted.org/packages/99/78/4126abcbdbd3c559d43e0db7f7b9173fc6befe45d39a2856cc0b8ec2a5a6/python_multipart-0.0.27-py3-none-any.whl", hash = "sha256:6fccfad17a27334bd0193681b369f476eda3409f17381a2d65aa7df3f7275645", size = 29254, upload-time = "2026-04-27T10:51:24.997Z" },
] ]
[[package]] [[package]]

View File

@@ -75,3 +75,5 @@ export { adaptersApi, type AdapterItem, type AdapterTypeField, type AdapterTypeM
export { shareApi, type ShareInfo, type ShareInfoWithPassword } from './share'; export { shareApi, type ShareInfo, type ShareInfoWithPassword } from './share';
export { offlineDownloadsApi, type OfflineDownloadTask, type OfflineDownloadCreate, type TaskProgress } from './offlineDownloads'; export { offlineDownloadsApi, type OfflineDownloadTask, type OfflineDownloadCreate, type TaskProgress } from './offlineDownloads';
export default request; export default request;
export { videoRoomApi, type VideoRoomInfo, type VideoRoomState } from './videoRoom';

47
web/src/api/videoRoom.ts Normal file
View File

@@ -0,0 +1,47 @@
import request, { API_BASE_URL } from './client';
export interface VideoRoomInfo {
id: number;
name: string;
token: string;
path: string;
control_mode: 'host_only' | 'everyone';
created_at: string;
expires_at?: string | null;
}
export interface VideoPlaybackState {
position_ms: number;
is_paused: boolean;
playback_rate: number;
updated_at: string;
updated_by: string;
}
export interface VideoRoomState {
room: VideoRoomInfo;
playback: VideoPlaybackState;
}
export interface VideoRoomCreatePayload {
path: string;
name?: string;
expires_in_days?: number;
control_mode?: 'host_only' | 'everyone';
}
export const videoRoomApi = {
create: (payload: VideoRoomCreatePayload) => request<VideoRoomInfo>('/video-rooms', { method: 'POST', json: payload }),
getState: (token: string) => request<VideoRoomState>(`/watch/${token}`),
pushEvent: (token: string, payload: { type: 'play' | 'pause' | 'seek' | 'rate'; position_ms?: number; playback_rate?: number }, actorId?: string) =>
request<{ playback: VideoPlaybackState }>(`/watch/${token}/events`, {
method: 'POST',
json: payload,
headers: actorId ? { 'X-Watch-Actor': actorId } : undefined,
}),
streamUrl: (token: string, path: string) => `${API_BASE_URL}/s/${token}/download?path=${encodeURIComponent(path)}`,
connectWs: (token: string, actorId: string) => {
const proto = window.location.protocol === 'https:' ? 'wss' : 'ws';
return new WebSocket(`${proto}://${window.location.host}/api/watch/${token}/ws?actor=${encodeURIComponent(actorId)}`);
},
};

View File

@@ -0,0 +1,147 @@
import { useEffect, useMemo, useRef, useState } from 'react';
import { useParams } from 'react-router';
import { Alert, Button, Card, Empty, Input, Space, Spin, Typography, message } from 'antd';
import { videoRoomApi, type VideoRoomState } from '../api/videoRoom';
const { Title, Text } = Typography;
export default function PublicWatchPage() {
const { token } = useParams();
const [data, setData] = useState<VideoRoomState | null>(null);
const [loading, setLoading] = useState(true);
const [err, setErr] = useState('');
const [wsConnected, setWsConnected] = useState(false);
const videoRef = useRef<HTMLVideoElement | null>(null);
const syncingRef = useRef(false);
const wsRef = useRef<WebSocket | null>(null);
const actorId = useMemo(() => {
const key = 'watch_actor_id';
const cached = localStorage.getItem(key);
if (cached) return cached;
const v = `guest:${Math.random().toString(36).slice(2, 10)}`;
localStorage.setItem(key, v);
return v;
}, []);
useEffect(() => {
if (!token) return;
const load = async () => {
try {
const res = await videoRoomApi.getState(token);
setData(res);
setErr('');
} catch (e: any) {
setErr(e.message || '加载视频间失败');
} finally {
setLoading(false);
}
};
void load();
}, [token]);
useEffect(() => {
if (!token) return;
let closedByCleanup = false;
let reconnectTimer: number | null = null;
const connect = () => {
const ws = videoRoomApi.connectWs(token, actorId);
wsRef.current = ws;
ws.onopen = () => setWsConnected(true);
ws.onmessage = (evt) => {
try {
const msg = JSON.parse(evt.data);
if (msg.type === 'snapshot' || msg.type === 'playback') {
setData((prev) => {
if (!prev) return prev;
return { ...prev, playback: msg.playback };
});
}
} catch {
void 0;
}
};
ws.onclose = () => {
setWsConnected(false);
if (!closedByCleanup) {
reconnectTimer = window.setTimeout(connect, 1500);
}
};
ws.onerror = () => {
setWsConnected(false);
};
};
connect();
return () => {
closedByCleanup = true;
setWsConnected(false);
if (reconnectTimer) window.clearTimeout(reconnectTimer);
wsRef.current?.close();
wsRef.current = null;
};
}, [token, actorId]);
useEffect(() => {
const video = videoRef.current;
const pb = data?.playback;
if (!video || !pb) return;
syncingRef.current = true;
const targetSec = (pb.position_ms || 0) / 1000;
if (Math.abs(video.currentTime - targetSec) > 1.2) video.currentTime = targetSec;
if (Math.abs(video.playbackRate - pb.playback_rate) > 0.01) video.playbackRate = pb.playback_rate;
if (pb.is_paused && !video.paused) video.pause();
if (!pb.is_paused && video.paused) void video.play().catch(() => void 0);
setTimeout(() => { syncingRef.current = false; }, 120);
}, [data?.playback?.updated_at]);
const sendEvent = (payload: { event: 'play' | 'pause' | 'seek' | 'rate'; position_ms?: number; playback_rate?: number }) => {
const ws = wsRef.current;
if (!ws || ws.readyState !== WebSocket.OPEN) return;
ws.send(JSON.stringify(payload));
};
if (loading) return <div style={{ padding: 40, textAlign: 'center' }}><Spin /></div>;
if (err || !data) return <div style={{ padding: 40 }}><Empty description={err || '房间不存在'} /></div>;
return (
<div style={{ maxWidth: 980, margin: '24px auto', padding: '0 16px' }}>
<Card>
<Space direction="vertical" size={8} style={{ width: '100%' }}>
<Title level={4} style={{ margin: 0 }}>{data.room.name}</Title>
<Text type="secondary">{data.playback.is_paused ? '暂停' : '播放中'} | {data.playback.playback_rate}x</Text>
<Text type={wsConnected ? 'success' : 'warning'}>{wsConnected ? '实时同步已连接' : '实时同步断开,正在重连…'}</Text>
<Input readOnly value={`${window.location.origin}/watch/${data.room.token}`} addonBefore="分享链接" />
</Space>
</Card>
<Card style={{ marginTop: 16 }}>
<video
ref={videoRef}
src={videoRoomApi.streamUrl(data.room.token, data.room.path)}
style={{ width: '100%', background: '#000', borderRadius: 8 }}
controls
onPlay={() => { if (!syncingRef.current) sendEvent({ event: 'play' }); }}
onPause={() => { if (!syncingRef.current) sendEvent({ event: 'pause' }); }}
onSeeked={() => {
if (syncingRef.current) return;
const ms = Math.floor((videoRef.current?.currentTime || 0) * 1000);
sendEvent({ event: 'seek', position_ms: ms });
}}
onRateChange={() => {
if (syncingRef.current) return;
const rate = videoRef.current?.playbackRate || 1;
sendEvent({ event: 'rate', playback_rate: rate });
}}
/>
<Alert type="info" showIcon style={{ marginTop: 12 }} message="已改为 WebSocket 实时同步,不再使用定时轮询。" />
<Space style={{ marginTop: 12 }}>
<Button onClick={() => { navigator.clipboard.writeText(`${window.location.origin}/watch/${data.room.token}`); message.success('已复制'); }}></Button>
</Space>
</Card>
</div>
);
}

View File

@@ -7,6 +7,7 @@ import SetupPage from '../pages/SetupPage.tsx';
import PublicSharePage from '../pages/PublicSharePage'; import PublicSharePage from '../pages/PublicSharePage';
import ForgotPasswordPage from '../pages/ForgotPasswordPage'; import ForgotPasswordPage from '../pages/ForgotPasswordPage';
import ResetPasswordPage from '../pages/ResetPasswordPage'; import ResetPasswordPage from '../pages/ResetPasswordPage';
import PublicWatchPage from '../pages/PublicWatchPage';
import { useAuth } from '../contexts/AuthContext'; import { useAuth } from '../contexts/AuthContext';
import type { JSX } from 'react'; import type { JSX } from 'react';
@@ -16,6 +17,7 @@ export const routes: RouteObject[] = [
{ path: '/login', element: <LoginPage /> }, { path: '/login', element: <LoginPage /> },
{ path: '/register', element: <RegisterPage /> }, { path: '/register', element: <RegisterPage /> },
{ path: '/share/:token', element: <PublicSharePage /> }, { path: '/share/:token', element: <PublicSharePage /> },
{ path: '/watch/:token', element: <PublicWatchPage /> },
{ path: '/setup', element: <SetupPage /> }, { path: '/setup', element: <SetupPage /> },
{ path: '/forgot-password', element: <ForgotPasswordPage /> }, { path: '/forgot-password', element: <ForgotPasswordPage /> },
{ path: '/reset-password', element: <ResetPasswordPage /> }, { path: '/reset-password', element: <ResetPasswordPage /> },
@@ -26,7 +28,7 @@ function RequireAuth({ children }: { children: JSX.Element }) {
const location = useLocation(); const location = useLocation();
const publicPaths = ['/login', '/register', '/forgot-password', '/reset-password']; const publicPaths = ['/login', '/register', '/forgot-password', '/reset-password'];
const isPublic = publicPaths.some((p) => location.pathname.startsWith(p)); const isPublic = publicPaths.some((p) => location.pathname.startsWith(p));
if (!isAuthenticated && !location.pathname.startsWith('/share/') && !isPublic) { if (!isAuthenticated && !location.pathname.startsWith('/share/') && !location.pathname.startsWith('/watch/') && !isPublic) {
return <Navigate to="/login" replace />; return <Navigate to="/login" replace />;
} }
return children; return children;