From 330e8fd72b9cbc2443e90686c9f1ee6e9dda8e86 Mon Sep 17 00:00:00 2001 From: shiyu Date: Mon, 22 Sep 2025 12:03:39 +0800 Subject: [PATCH] feat(offline-downloads): implement offline download --- api/routers.py | 3 +- api/routes/offline_downloads.py | 79 +++++++++ schemas/offline_downloads.py | 7 + services/offline_download.py | 199 ++++++++++++++++++++++ services/task_queue.py | 30 ++++ web/src/api/client.ts | 1 + web/src/api/offlineDownloads.ts | 35 ++++ web/src/api/tasks.ts | 5 +- web/src/i18n/locales/en.ts | 22 ++- web/src/i18n/locales/zh.ts | 23 ++- web/src/pages/OfflineDownloadPage.tsx | 234 +++++++++++++++++++++++++- 11 files changed, 630 insertions(+), 8 deletions(-) create mode 100644 api/routes/offline_downloads.py create mode 100644 schemas/offline_downloads.py create mode 100644 services/offline_download.py create mode 100644 web/src/api/offlineDownloads.ts diff --git a/api/routers.py b/api/routers.py index a6bd33b..8128a1d 100644 --- a/api/routers.py +++ b/api/routers.py @@ -1,6 +1,6 @@ from fastapi import FastAPI -from .routes import adapters, virtual_fs, auth, config, processors, tasks, logs, share, backup, search, vector_db +from .routes import adapters, virtual_fs, auth, config, processors, tasks, logs, share, backup, search, vector_db, offline_downloads from .routes import webdav from .routes import plugins @@ -20,3 +20,4 @@ def include_routers(app: FastAPI): app.include_router(vector_db.router) app.include_router(plugins.router) app.include_router(webdav.router) + app.include_router(offline_downloads.router) diff --git a/api/routes/offline_downloads.py b/api/routes/offline_downloads.py new file mode 100644 index 0000000..87b9d09 --- /dev/null +++ b/api/routes/offline_downloads.py @@ -0,0 +1,79 @@ +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException + +from api.response import success +from schemas.offline_downloads import OfflineDownloadCreate +from services.auth import User, get_current_active_user +from services.logging import LogService +from services.task_queue import task_queue_service, TaskProgress +from services.virtual_fs import path_is_directory + + +router = APIRouter( + prefix="/api/offline-downloads", + tags=["OfflineDownloads"], +) + + +@router.post("/") +async def create_offline_download( + payload: OfflineDownloadCreate, + current_user: Annotated[User, Depends(get_current_active_user)], +): + dest_dir = payload.dest_dir + try: + is_dir = await path_is_directory(dest_dir) + except HTTPException: + is_dir = False + if not is_dir: + raise HTTPException(400, detail="Destination directory not found") + + task = await task_queue_service.add_task( + "offline_http_download", + { + "url": str(payload.url), + "dest_dir": dest_dir, + "filename": payload.filename, + }, + ) + + await task_queue_service.update_progress( + task.id, + TaskProgress( + stage="queued", + percent=0.0, + bytes_total=None, + bytes_done=0, + detail="Waiting to start", + ), + ) + + await LogService.action( + "route:offline_downloads", + f"Offline download task created {task.id}", + details={"url": str(payload.url), "dest_dir": dest_dir, "filename": payload.filename}, + user_id=current_user.id if hasattr(current_user, "id") else None, + ) + + return success({"task_id": task.id}) + + +@router.get("/") +async def list_offline_downloads( + current_user: Annotated[User, Depends(get_current_active_user)], +): + tasks = [t for t in task_queue_service.get_all_tasks() if t.name == "offline_http_download"] + data = [t.dict() for t in tasks] + return success(data) + + +@router.get("/{task_id}") +async def get_offline_download( + task_id: str, + current_user: Annotated[User, Depends(get_current_active_user)], +): + task = task_queue_service.get_task(task_id) + if not task or task.name != "offline_http_download": + raise HTTPException(status_code=404, detail="Task not found") + return success(task.dict()) diff --git a/schemas/offline_downloads.py b/schemas/offline_downloads.py new file mode 100644 index 0000000..dafad07 --- /dev/null +++ b/schemas/offline_downloads.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel, HttpUrl, Field + + +class OfflineDownloadCreate(BaseModel): + url: HttpUrl + dest_dir: str = Field(..., min_length=1) + filename: str = Field(..., min_length=1) diff --git a/services/offline_download.py b/services/offline_download.py new file mode 100644 index 0000000..9d68d71 --- /dev/null +++ b/services/offline_download.py @@ -0,0 +1,199 @@ +import os +import time +from pathlib import Path +from typing import AsyncIterator + +import aiofiles +import aiohttp +from fastapi import HTTPException + +from services.logging import LogService +from services.task_queue import Task, task_queue_service, TaskProgress +from services.virtual_fs import write_file_stream, stat_file + + +TEMP_ROOT = Path("data/tmp/offline_downloads") + + +def _normalize_path(path: str) -> str: + if not path: + return "/" + if not path.startswith("/"): + path = "/" + path + if len(path) > 1 and path.endswith("/"): + path = path.rstrip("/") + return path or "/" + + +async def _path_exists(full_path: str) -> bool: + try: + await stat_file(full_path) + return True + except FileNotFoundError: + return False + except HTTPException as exc: + if exc.status_code == 404: + return False + raise + + +def _split_filename(filename: str) -> tuple[str, str]: + if not filename: + return "", "" + if filename.startswith('.') and filename.count('.') == 1: + return filename, "" + if '.' not in filename: + return filename, "" + stem, ext = filename.rsplit('.', 1) + return stem, f".{ext}" + + +async def _allocate_destination(dest_dir: str, filename: str) -> tuple[str, str]: + dest_dir = _normalize_path(dest_dir) + stem, suffix = _split_filename(filename) + candidate = filename + if dest_dir == "/": + base = "" + else: + base = dest_dir + attempt = 0 + while await _path_exists(f"{base}/{candidate}" if base else f"/{candidate}"): + attempt += 1 + if stem: + candidate = f"{stem} ({attempt}){suffix}" + else: + candidate = f"file ({attempt}){suffix}" if suffix else f"file ({attempt})" + if base: + full_path = f"{base}/{candidate}" + else: + full_path = f"/{candidate}" + return full_path, candidate + + +async def _iter_file(path: Path, chunk_size: int, report_cb) -> AsyncIterator[bytes]: + async with aiofiles.open(path, "rb") as f: + while True: + chunk = await f.read(chunk_size) + if not chunk: + break + await report_cb(len(chunk)) + yield chunk + + +async def run_http_download(task: Task): + params = task.task_info + url = params.get("url") + dest_dir = params.get("dest_dir") + filename = params.get("filename") + + if not url or not dest_dir or not filename: + raise ValueError("Missing required parameters for offline download") + + TEMP_ROOT.mkdir(parents=True, exist_ok=True) + temp_dir = TEMP_ROOT / task.id + temp_dir.mkdir(parents=True, exist_ok=True) + temp_file = temp_dir / "payload" + + bytes_total: int | None = None + bytes_done = 0 + + last_update = time.monotonic() + + await task_queue_service.update_progress( + task.id, + TaskProgress( + stage="downloading", + percent=0.0, + bytes_total=None, + bytes_done=0, + detail="HTTP downloading", + ), + ) + + async def report_download(delta: int, total: int | None): + nonlocal bytes_done, bytes_total, last_update + if total is not None: + bytes_total = total + bytes_done += delta + now = time.monotonic() + if delta and now - last_update < 0.5: + return + last_update = now + percent = None + total_for_display = bytes_total if bytes_total is not None else None + if bytes_total: + percent = min(100.0, round(bytes_done / bytes_total * 100, 2)) + await task_queue_service.update_progress( + task.id, + TaskProgress( + stage="downloading", + percent=percent, + bytes_total=total_for_display, + bytes_done=bytes_done, + detail="HTTP downloading", + ), + ) + + timeout = aiohttp.ClientTimeout(total=None, connect=30) + + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.get(url) as resp: + if resp.status != 200: + raise ValueError(f"HTTP {resp.status} for {url}") + content_length = resp.headers.get("Content-Length") + total_size = int(content_length) if content_length else None + bytes_done = 0 + async with aiofiles.open(temp_file, "wb") as f: + async for chunk in resp.content.iter_chunked(512 * 1024): + if not chunk: + continue + await f.write(chunk) + await report_download(len(chunk), total_size) + # ensure final update + await report_download(0, total_size) + + file_size = os.path.getsize(temp_file) + + bytes_done_transfer = 0 + + async def report_transfer(delta: int): + nonlocal bytes_done_transfer + bytes_done_transfer += delta + percent = min(100.0, round(bytes_done_transfer / file_size * 100, 2)) if file_size else None + await task_queue_service.update_progress( + task.id, + TaskProgress( + stage="transferring", + percent=percent, + bytes_total=file_size or None, + bytes_done=bytes_done_transfer, + detail="Saving to storage", + ), + ) + + async def chunk_iter() -> AsyncIterator[bytes]: + async for chunk in _iter_file(temp_file, 512 * 1024, report_transfer): + yield chunk + + final_path, resolved_name = await _allocate_destination(dest_dir, filename) + + await task_queue_service.update_progress( + task.id, + TaskProgress(stage="transferring", percent=0.0, bytes_total=file_size or None, bytes_done=0, detail="Saving to storage"), + ) + + await write_file_stream(final_path, chunk_iter()) + + await task_queue_service.update_progress( + task.id, + TaskProgress(stage="completed", percent=100.0, bytes_total=file_size or None, bytes_done=file_size, detail="Completed"), + ) + await task_queue_service.update_meta(task.id, {"final_path": final_path, "filename": resolved_name}) + + try: + os.remove(temp_file) + temp_dir.rmdir() + except Exception: + await LogService.info("offline_download", f"Temp cleanup failed for task {task.id}") + + return final_path diff --git a/services/task_queue.py b/services/task_queue.py index f1cbf5a..0f8c0a0 100644 --- a/services/task_queue.py +++ b/services/task_queue.py @@ -13,6 +13,14 @@ class TaskStatus(str, Enum): FAILED = "failed" +class TaskProgress(BaseModel): + stage: str | None = None + percent: float | None = None + bytes_total: int | None = None + bytes_done: int | None = None + detail: str | None = None + + class Task(BaseModel): id: str = Field(default_factory=lambda: uuid.uuid4().hex) name: str @@ -20,6 +28,8 @@ class Task(BaseModel): result: Any = None error: str | None = None task_info: Dict[str, Any] = {} + progress: TaskProgress | None = None + meta: Dict[str, Any] | None = None class TaskQueueService: @@ -41,6 +51,21 @@ class TaskQueueService: def get_all_tasks(self) -> list[Task]: return list(self._tasks.values()) + async def update_progress(self, task_id: str, progress: TaskProgress | Dict[str, Any]): + task = self._tasks.get(task_id) + if not task: + return + if isinstance(progress, TaskProgress): + task.progress = progress + else: + task.progress = TaskProgress(**progress) + + async def update_meta(self, task_id: str, meta: Dict[str, Any]): + task = self._tasks.get(task_id) + if not task: + return + task.meta = (task.meta or {}) | meta + async def _execute_task(self, task: Task): from services.virtual_fs import process_file @@ -78,6 +103,11 @@ class TaskQueueService: if save_to and getattr(processor, "produces_file", False): await write_file(save_to, result) task.result = "Automation task completed" + elif task.name == "offline_http_download": + from services.offline_download import run_http_download + + result_path = await run_http_download(task) + task.result = {"path": result_path} else: raise ValueError(f"Unknown task name: {task.name}") diff --git a/web/src/api/client.ts b/web/src/api/client.ts index 7440389..157dcb4 100644 --- a/web/src/api/client.ts +++ b/web/src/api/client.ts @@ -73,4 +73,5 @@ async function request(url: string, options: RequestOptions = {}): Prom export { vfsApi, type VfsEntry, type DirListing } from './vfs'; export { adaptersApi, type AdapterItem, type AdapterTypeField, type AdapterTypeMeta } from './adapters'; export { shareApi, type ShareInfo, type ShareInfoWithPassword } from './share'; +export { offlineDownloadsApi, type OfflineDownloadTask, type OfflineDownloadCreate, type TaskProgress } from './offlineDownloads'; export default request; diff --git a/web/src/api/offlineDownloads.ts b/web/src/api/offlineDownloads.ts new file mode 100644 index 0000000..a80d2d5 --- /dev/null +++ b/web/src/api/offlineDownloads.ts @@ -0,0 +1,35 @@ +import request from './client'; + +export interface TaskProgress { + stage?: string | null; + percent?: number | null; + bytes_total?: number | null; + bytes_done?: number | null; + detail?: string | null; +} + +export interface OfflineDownloadTask { + id: string; + name: string; + status: 'pending' | 'running' | 'success' | 'failed'; + result?: any; + error?: string | null; + task_info: Record; + progress?: TaskProgress | null; + meta?: Record | null; +} + +export interface OfflineDownloadCreate { + url: string; + dest_dir: string; + filename: string; +} + +export const offlineDownloadsApi = { + create: (payload: OfflineDownloadCreate) => request<{ task_id: string }>('/offline-downloads/', { + method: 'POST', + json: payload, + }), + list: () => request('/offline-downloads/'), + detail: (taskId: string) => request(`/offline-downloads/${taskId}`), +}; diff --git a/web/src/api/tasks.ts b/web/src/api/tasks.ts index 2781f71..9137eea 100644 --- a/web/src/api/tasks.ts +++ b/web/src/api/tasks.ts @@ -1,4 +1,5 @@ import request from './client'; +import type { TaskProgress } from './offlineDownloads'; export interface AutomationTask { id: number; @@ -21,6 +22,8 @@ export interface QueuedTask { result?: any; error?: string; task_info: Record; + progress?: TaskProgress | null; + meta?: Record | null; } export const tasksApi = { @@ -29,4 +32,4 @@ export const tasksApi = { update: (id: number, payload: AutomationTaskUpdate) => request(`/tasks/${id}`, { method: 'PUT', json: payload }), remove: (id: number) => request(`/tasks/${id}`, { method: 'DELETE' }), getQueue: () => request('/tasks/queue'), -}; \ No newline at end of file +}; diff --git a/web/src/i18n/locales/en.ts b/web/src/i18n/locales/en.ts index 28b51ae..65a6a0b 100644 --- a/web/src/i18n/locales/en.ts +++ b/web/src/i18n/locales/en.ts @@ -83,6 +83,27 @@ export const en = { // Offline download 'No offline download tasks': 'No offline download tasks', + 'Create Offline Download': 'Create Offline Download', + 'Offline Download Tasks': 'Offline Download Tasks', + 'URL': 'URL', + 'Please input URL': 'Please input URL', + 'Destination Folder': 'Destination Folder', + 'Select destination': 'Select destination', + 'Filename': 'Filename', + 'Please input filename': 'Please input filename', + 'Start Download': 'Start Download', + 'Stage': 'Stage', + 'Progress': 'Progress', + 'Bytes': 'Bytes', + 'Save Path': 'Save Path', + 'Queued': 'Queued', + 'Downloading': 'Downloading', + 'Transferring': 'Transferring', + 'Completed': 'Completed', + 'Pending': 'Pending', + 'Running': 'Running', + 'Success': 'Success', + 'Failed': 'Failed', // Header/File Explorer 'Home': 'Home', 'File Manager': 'File Manager', @@ -161,7 +182,6 @@ export const en = { 'Width': 'Width', 'Height': 'Height', 'No common EXIF info': 'No common EXIF info', - 'Bytes': 'Bytes', 'File Properties': 'File Properties', 'Loading file info...': 'Loading file info...', 'Basic Info': 'Basic Info', diff --git a/web/src/i18n/locales/zh.ts b/web/src/i18n/locales/zh.ts index df0155a..6e93797 100644 --- a/web/src/i18n/locales/zh.ts +++ b/web/src/i18n/locales/zh.ts @@ -11,6 +11,28 @@ export const zh = { 'Automation': '自动任务', 'My Shares': '我的分享', 'Offline Downloads': '离线下载', + 'No offline download tasks': '暂无离线下载任务', + 'Create Offline Download': '创建离线下载任务', + 'Offline Download Tasks': '离线下载任务列表', + 'URL': '下载地址', + 'Please input URL': '请输入下载地址', + 'Destination Folder': '保存目录', + 'Select destination': '请选择保存目录', + 'Filename': '文件名', + 'Please input filename': '请输入文件名', + 'Start Download': '开始下载', + 'Stage': '阶段', + 'Progress': '进度', + 'Bytes': '已传输', + 'Save Path': '保存路径', + 'Queued': '排队中', + 'Downloading': '下载中', + 'Transferring': '转存中', + 'Completed': '已完成', + 'Pending': '等待', + 'Running': '进行中', + 'Success': '成功', + 'Failed': '失败', 'Adapters': '存储挂载', 'Plugins': '应用中心', 'System Settings': '系统设置', @@ -162,7 +184,6 @@ export const zh = { 'Width': '宽度', 'Height': '高度', 'No common EXIF info': '无常见EXIF信息', - 'Bytes': '字节', 'File Properties': '文件属性', 'Loading file info...': '加载文件信息...', 'Basic Info': '基本信息', diff --git a/web/src/pages/OfflineDownloadPage.tsx b/web/src/pages/OfflineDownloadPage.tsx index 8fa58d5..57f0024 100644 --- a/web/src/pages/OfflineDownloadPage.tsx +++ b/web/src/pages/OfflineDownloadPage.tsx @@ -1,8 +1,234 @@ -import { Empty } from 'antd'; +import { memo, useCallback, useEffect, useMemo, useState } from 'react'; +import { Button, Form, Input, Modal, message, Table, Tag, Typography, Space } from 'antd'; +import type { TableColumnsType } from 'antd'; +import { FolderOpenOutlined } from '@ant-design/icons'; +import PathSelectorModal from '../components/PathSelectorModal'; +import { offlineDownloadsApi, type OfflineDownloadTask } from '../api/client'; import { useI18n } from '../i18n'; +import PageCard from '../components/PageCard'; -export default function OfflineDownloadPage() { - const { t } = useI18n(); - return ; +interface TableRow extends OfflineDownloadTask { + key: string; } + +function formatBytes(bytes?: number | null): string { + if (bytes === undefined || bytes === null) return '--'; + if (bytes === 0) return '0 B'; + const units = ['B', 'KB', 'MB', 'GB', 'TB']; + const idx = Math.min(Math.floor(Math.log(bytes) / Math.log(1024)), units.length - 1); + const val = bytes / Math.pow(1024, idx); + return `${val.toFixed(idx === 0 ? 0 : 1)} ${units[idx]}`; +} + +function stageLabel(t: (key: string) => string, stage?: string | null): string { + if (!stage) return '--'; + const map: Record = { + queued: t('Queued'), + downloading: t('Downloading'), + transferring: t('Transferring'), + completed: t('Completed'), + }; + return map[stage] ?? stage; +} + +function statusTag(status: OfflineDownloadTask['status'], t: (key: string) => string) { + switch (status) { + case 'success': + return {t('Success')}; + case 'failed': + return {t('Failed')}; + case 'running': + return {t('Running')}; + default: + return {t('Pending')}; + } +} + +const OfflineDownloadPage = memo(function OfflineDownloadPage() { + const { t } = useI18n(); + const [form] = Form.useForm(); + const [messageApi, contextHolder] = message.useMessage(); + const [tasks, setTasks] = useState([]); + const [loading, setLoading] = useState(false); + const [submitting, setSubmitting] = useState(false); + const [pathModalOpen, setPathModalOpen] = useState(false); + const [createModalOpen, setCreateModalOpen] = useState(false); + + const loadTasks = useCallback(async (withSpinner = false) => { + if (withSpinner) setLoading(true); + try { + const list = await offlineDownloadsApi.list(); + setTasks(list.map(item => ({ ...item, key: item.id }))); + } catch (err: any) { + messageApi.error(err?.message || t('Load failed')); + } finally { + if (withSpinner) setLoading(false); + } + }, [messageApi, t]); + + useEffect(() => { + loadTasks(true); + const timer = window.setInterval(() => { + loadTasks().catch(() => {}); + }, 3000); + return () => window.clearInterval(timer); + }, [loadTasks]); + + const handleSelectFolder = useCallback((path: string) => { + form.setFieldsValue({ dest_dir: path }); + setPathModalOpen(false); + }, [form]); + + const handleSubmit = useCallback(async () => { + try { + const values = await form.validateFields(); + setSubmitting(true); + const resp = await offlineDownloadsApi.create(values); + messageApi.success(`${t('Task submitted')}: ${resp.task_id}`); + form.resetFields(); + await loadTasks(true); + setCreateModalOpen(false); + } catch (err: any) { + if (err?.errorFields) return; + messageApi.error(err?.message || t('Operation failed')); + } finally { + setSubmitting(false); + } + }, [form, loadTasks, messageApi, t]); + + const columns: TableColumnsType = useMemo(() => [ + { + title: t('Filename'), + dataIndex: ['meta', 'filename'], + render: (_: any, record) => record.meta?.filename || record.task_info?.filename || '--', + }, + { + title: t('Stage'), + dataIndex: ['progress', 'stage'], + render: (_: any, record) => stageLabel(t, record.progress?.stage), + }, + { + title: t('Progress'), + dataIndex: ['progress', 'percent'], + render: (_: any, record) => { + const percent = record.progress?.percent; + return percent !== undefined && percent !== null ? `${percent.toFixed(1)}%` : '--'; + }, + }, + { + title: t('Bytes'), + render: (_: any, record) => { + const done = record.progress?.bytes_done; + const total = record.progress?.bytes_total; + if (done === undefined && total === undefined) return '--'; + if (total) { + return `${formatBytes(done)} / ${formatBytes(total)}`; + } + return formatBytes(done); + }, + }, + { + title: t('Status'), + dataIndex: 'status', + render: (status: TableRow['status'], record) => ( + + {statusTag(status, t)} + {status === 'failed' && record.error ? {record.error} : null} + + ), + }, + { + title: t('Save Path'), + dataIndex: ['meta', 'final_path'], + render: (value: string | undefined, record) => value || record.task_info?.dest_dir || '--', + }, + ], [t]); + + return ( + <> + {contextHolder} + { form.resetFields(); setCreateModalOpen(true); }}> + {t('Create Offline Download')} + + } + > + + + + { setCreateModalOpen(false); form.resetFields(); }} + onOk={handleSubmit} + okText={t('Start Download')} + okButtonProps={{ loading: submitting }} + > +
+ + + + {(() => { + const errors = form.getFieldError('dest_dir'); + const hasError = errors.length > 0; + return ( + + + + setPathModalOpen(true)} + /> + + + + + ); + })()} + + + + +
+ + setPathModalOpen(false)} + /> + + ); +}); + +export default OfflineDownloadPage;