From e3a5317f6fc328425f6d909c5dc38c62776f67b4 Mon Sep 17 00:00:00 2001 From: shiyu Date: Sat, 16 May 2026 18:14:52 +0800 Subject: [PATCH] feat: add raw stream upload functionality --- domain/virtual_fs/api.py | 13 +++++ domain/virtual_fs/routes.py | 20 ++++++- web/src/api/vfs.ts | 34 ++++++++++++ .../FileExplorerPage/hooks/useUploader.ts | 54 ++++++++++++++----- 4 files changed, 106 insertions(+), 15 deletions(-) diff --git a/domain/virtual_fs/api.py b/domain/virtual_fs/api.py index a00814d..caf984b 100644 --- a/domain/virtual_fs/api.py +++ b/domain/virtual_fs/api.py @@ -172,6 +172,19 @@ async def upload_stream( return success(result) +@router.put("/upload-raw/{full_path:path}") +@audit(action=AuditAction.UPLOAD, description="原始流上传文件") +@require_path_permission(PathAction.WRITE, "full_path") +async def upload_raw_stream( + request: Request, + current_user: Annotated[User, Depends(get_current_active_user)], + full_path: str, + overwrite: bool = Query(True, description="是否覆盖已存在文件"), +): + result = await VirtualFSService.upload_raw_stream(full_path, request, overwrite) + return success(result) + + @router.get("/{full_path:path}") @audit(action=AuditAction.READ, description="浏览目录") @require_path_permission(PathAction.READ, "full_path") diff --git a/domain/virtual_fs/routes.py b/domain/virtual_fs/routes.py index b4ed506..a00df1e 100644 --- a/domain/virtual_fs/routes.py +++ b/domain/virtual_fs/routes.py @@ -2,7 +2,7 @@ import mimetypes import re from urllib.parse import quote -from fastapi import HTTPException, UploadFile +from fastapi import HTTPException, Request, UploadFile from fastapi.responses import Response from domain.config import ConfigService @@ -271,6 +271,24 @@ class VirtualFSRouteMixin(VirtualFSTempLinkMixin): size = int(result or 0) return {"uploaded": True, "path": path, "size": size, "overwrite": overwrite} + @classmethod + async def upload_raw_stream(cls, full_path: str, request: Request, overwrite: bool): + full_path = cls._normalize_path(full_path) + if full_path.endswith("/"): + raise HTTPException(400, detail="Path must be a file") + + result = await cls.write_file_stream(full_path, request.stream(), overwrite=overwrite) + path = full_path + size = 0 + if isinstance(result, dict): + path = result.get("path") or path + size_val = result.get("size") + if isinstance(size_val, int): + size = size_val + else: + size = int(result or 0) + return {"uploaded": True, "path": path, "size": size, "overwrite": overwrite} + @classmethod async def list_directory(cls, full_path: str, page_num: int, page_size: int, sort_by: str, sort_order: str): full_path = cls._normalize_path(full_path) diff --git a/web/src/api/vfs.ts b/web/src/api/vfs.ts index dcfcaff..2f3c1e0 100644 --- a/web/src/api/vfs.ts +++ b/web/src/api/vfs.ts @@ -100,6 +100,40 @@ export const vfsApi = { getTempLinkToken: (path: string, expiresIn: number = 3600) => request<{token: string, path: string, url: string}>(`/fs/temp-link/${encodeURI(path.replace(/^\/+/, ''))}?expires_in=${expiresIn}`), getTempPublicUrl: (token: string) => `${API_BASE_URL}/fs/public/${token}`, + uploadRaw: (fullPath: string, file: File, overwrite: boolean = true, onProgress?: (loaded: number, total: number) => void) => { + const enc = encodeURI(fullPath.replace(/^\/+/, '')); + return new Promise((resolve, reject) => { + const xhr = new XMLHttpRequest(); + xhr.open('PUT', `${API_BASE_URL}/fs/upload-raw/${enc}?overwrite=${overwrite}`); + const token = localStorage.getItem('token'); + if (token) xhr.setRequestHeader('Authorization', `Bearer ${token}`); + xhr.setRequestHeader('Content-Type', file.type || 'application/octet-stream'); + xhr.upload.onprogress = (ev) => { + if (ev.lengthComputable && onProgress) onProgress(ev.loaded, ev.total); + }; + xhr.onreadystatechange = () => { + if (xhr.readyState === 4) { + if (xhr.status >= 200 && xhr.status < 300) { + try { + const json = JSON.parse(xhr.responseText); + if (json.code === 0) return resolve(json.data); + return reject(new Error(json.msg || json.message || 'Upload failed')); + } catch { + return reject(new Error('Invalid response')); + } + } else { + let err = 'Upload failed'; + try { + const json = JSON.parse(xhr.responseText); + err = json.detail || json.msg || json.message || err; + } catch { void 0; } + reject(new Error(err)); + } + } + }; + xhr.send(file); + }); + }, uploadStream: (fullPath: string, file: File, overwrite: boolean = true, onProgress?: (loaded: number, total: number) => void) => { const enc = encodeURI(fullPath.replace(/^\/+/, '')); return new Promise((resolve, reject) => { diff --git a/web/src/pages/FileExplorerPage/hooks/useUploader.ts b/web/src/pages/FileExplorerPage/hooks/useUploader.ts index 0bc7900..e57e5df 100644 --- a/web/src/pages/FileExplorerPage/hooks/useUploader.ts +++ b/web/src/pages/FileExplorerPage/hooks/useUploader.ts @@ -43,6 +43,8 @@ interface RawUploadDirectory { type RawUploadItem = RawUploadFile | RawUploadDirectory; +const MAX_UPLOAD_CONCURRENCY = 3; + const generateId = (() => { const cryptoApi = typeof crypto !== 'undefined' ? crypto : undefined; return () => { @@ -457,15 +459,15 @@ export function useUploader(path: string, onUploadComplete: () => void) { } }, [ensureDirectory, updateFile, t]); - const processFileTask = useCallback(async (task: UploadFile) => { + const prepareFileTask = useCallback(async (task: UploadFile): Promise<{ task: UploadFile; overwrite: boolean } | null> => { if (!task.file) { updateFile(task.id, { status: 'error', error: t('Missing file content') }); - return; + return null; } if (skipAllRef.current) { updateFile(task.id, { status: 'skipped', progress: 0 }); - return; + return null; } let shouldOverwrite = overwriteAllRef.current; @@ -475,19 +477,28 @@ export function useUploader(path: string, onUploadComplete: () => void) { const decision = await awaitConflictDecision(task); if (decision === 'skip') { updateFile(task.id, { status: 'skipped', progress: 0 }); - return; + return null; } shouldOverwrite = true; } } setConflict(null); - updateFile(task.id, { status: 'uploading', progress: 0, loadedBytes: 0 }); - const parentDir = task.targetPath.replace(/\/[^/]+$/, '') || '/'; + await ensureDirectoryTree(parentDir); + updateFile(task.id, { status: 'pending', progress: 0, loadedBytes: 0 }); + return { task, overwrite: shouldOverwrite }; + }, [ensureDirectoryTree, awaitConflictDecision, updateFile, t]); + + const uploadPreparedFile = useCallback(async (task: UploadFile, shouldOverwrite: boolean) => { + if (!task.file) { + updateFile(task.id, { status: 'error', error: t('Missing file content') }); + return; + } + + updateFile(task.id, { status: 'uploading', progress: 0, loadedBytes: 0 }); try { - await ensureDirectoryTree(parentDir); - const uploadResult = await vfsApi.uploadStream(task.targetPath, task.file, shouldOverwrite, (loaded, total) => { + const uploadResult = await vfsApi.uploadRaw(task.targetPath, task.file, shouldOverwrite, (loaded, total) => { mutateFiles((prev) => prev.map((f) => { if (f.id !== task.id) return f; const effectiveTotal = total > 0 ? total : f.size; @@ -506,22 +517,34 @@ export function useUploader(path: string, onUploadComplete: () => void) { const finalSize = typeof uploadResult?.size === 'number' && uploadResult.size > 0 ? uploadResult.size : task.size; - const link = await vfsApi.getTempLinkToken(actualPath, 60 * 60 * 24 * 365 * 10); - const permanentLink = vfsApi.getTempPublicUrl(link.token); updateFile(task.id, { status: 'success', progress: 100, loadedBytes: finalSize, size: finalSize, targetPath: actualPath, - permanentLink, }); } catch (err: unknown) { const error = err instanceof Error ? err.message : t('Upload failed'); updateFile(task.id, { status: 'error', error, progress: 0 }); message.error(`${task.relativePath}: ${error}`); } - }, [ensureDirectoryTree, awaitConflictDecision, mutateFiles, updateFile, t]); + }, [mutateFiles, updateFile, t]); + + const uploadPreparedFiles = useCallback(async (preparedFiles: Array<{ task: UploadFile; overwrite: boolean }>) => { + let nextIndex = 0; + const workerCount = Math.min(MAX_UPLOAD_CONCURRENCY, preparedFiles.length); + + const runWorker = async () => { + while (nextIndex < preparedFiles.length) { + const current = preparedFiles[nextIndex]; + nextIndex += 1; + await uploadPreparedFile(current.task, current.overwrite); + } + }; + + await Promise.all(Array.from({ length: workerCount }, runWorker)); + }, [uploadPreparedFile]); const startUpload = useCallback(async () => { if (isUploadingRef.current) return; @@ -530,6 +553,7 @@ export function useUploader(path: string, onUploadComplete: () => void) { isUploadingRef.current = true; setIsUploading(true); try { + const preparedFiles: Array<{ task: UploadFile; overwrite: boolean }> = []; for (const task of filesRef.current) { if (task.status !== 'pending' && task.status !== 'waiting') { continue; @@ -537,15 +561,17 @@ export function useUploader(path: string, onUploadComplete: () => void) { if (task.type === 'directory') { await processDirectoryTask(task); } else { - await processFileTask(task); + const prepared = await prepareFileTask(task); + if (prepared) preparedFiles.push(prepared); } } + await uploadPreparedFiles(preparedFiles); onUploadComplete(); } finally { isUploadingRef.current = false; setIsUploading(false); } - }, [onUploadComplete, processDirectoryTask, processFileTask]); + }, [onUploadComplete, processDirectoryTask, prepareFileTask, uploadPreparedFiles]); const totalFileBytes = useMemo( () => files.reduce((acc, f) => acc + (f.type === 'file' ? f.size : 0), 0),