feat(offline-downloads): implement offline download

This commit is contained in:
shiyu
2025-09-22 12:03:39 +08:00
parent 11c717e61d
commit 330e8fd72b
11 changed files with 630 additions and 8 deletions

View File

@@ -1,6 +1,6 @@
from fastapi import FastAPI
from .routes import adapters, virtual_fs, auth, config, processors, tasks, logs, share, backup, search, vector_db
from .routes import adapters, virtual_fs, auth, config, processors, tasks, logs, share, backup, search, vector_db, offline_downloads
from .routes import webdav
from .routes import plugins
@@ -20,3 +20,4 @@ def include_routers(app: FastAPI):
app.include_router(vector_db.router)
app.include_router(plugins.router)
app.include_router(webdav.router)
app.include_router(offline_downloads.router)

View File

@@ -0,0 +1,79 @@
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from api.response import success
from schemas.offline_downloads import OfflineDownloadCreate
from services.auth import User, get_current_active_user
from services.logging import LogService
from services.task_queue import task_queue_service, TaskProgress
from services.virtual_fs import path_is_directory
router = APIRouter(
prefix="/api/offline-downloads",
tags=["OfflineDownloads"],
)
@router.post("/")
async def create_offline_download(
payload: OfflineDownloadCreate,
current_user: Annotated[User, Depends(get_current_active_user)],
):
dest_dir = payload.dest_dir
try:
is_dir = await path_is_directory(dest_dir)
except HTTPException:
is_dir = False
if not is_dir:
raise HTTPException(400, detail="Destination directory not found")
task = await task_queue_service.add_task(
"offline_http_download",
{
"url": str(payload.url),
"dest_dir": dest_dir,
"filename": payload.filename,
},
)
await task_queue_service.update_progress(
task.id,
TaskProgress(
stage="queued",
percent=0.0,
bytes_total=None,
bytes_done=0,
detail="Waiting to start",
),
)
await LogService.action(
"route:offline_downloads",
f"Offline download task created {task.id}",
details={"url": str(payload.url), "dest_dir": dest_dir, "filename": payload.filename},
user_id=current_user.id if hasattr(current_user, "id") else None,
)
return success({"task_id": task.id})
@router.get("/")
async def list_offline_downloads(
current_user: Annotated[User, Depends(get_current_active_user)],
):
tasks = [t for t in task_queue_service.get_all_tasks() if t.name == "offline_http_download"]
data = [t.dict() for t in tasks]
return success(data)
@router.get("/{task_id}")
async def get_offline_download(
task_id: str,
current_user: Annotated[User, Depends(get_current_active_user)],
):
task = task_queue_service.get_task(task_id)
if not task or task.name != "offline_http_download":
raise HTTPException(status_code=404, detail="Task not found")
return success(task.dict())

View File

@@ -0,0 +1,7 @@
from pydantic import BaseModel, HttpUrl, Field
class OfflineDownloadCreate(BaseModel):
url: HttpUrl
dest_dir: str = Field(..., min_length=1)
filename: str = Field(..., min_length=1)

View File

@@ -0,0 +1,199 @@
import os
import time
from pathlib import Path
from typing import AsyncIterator
import aiofiles
import aiohttp
from fastapi import HTTPException
from services.logging import LogService
from services.task_queue import Task, task_queue_service, TaskProgress
from services.virtual_fs import write_file_stream, stat_file
TEMP_ROOT = Path("data/tmp/offline_downloads")
def _normalize_path(path: str) -> str:
if not path:
return "/"
if not path.startswith("/"):
path = "/" + path
if len(path) > 1 and path.endswith("/"):
path = path.rstrip("/")
return path or "/"
async def _path_exists(full_path: str) -> bool:
try:
await stat_file(full_path)
return True
except FileNotFoundError:
return False
except HTTPException as exc:
if exc.status_code == 404:
return False
raise
def _split_filename(filename: str) -> tuple[str, str]:
if not filename:
return "", ""
if filename.startswith('.') and filename.count('.') == 1:
return filename, ""
if '.' not in filename:
return filename, ""
stem, ext = filename.rsplit('.', 1)
return stem, f".{ext}"
async def _allocate_destination(dest_dir: str, filename: str) -> tuple[str, str]:
dest_dir = _normalize_path(dest_dir)
stem, suffix = _split_filename(filename)
candidate = filename
if dest_dir == "/":
base = ""
else:
base = dest_dir
attempt = 0
while await _path_exists(f"{base}/{candidate}" if base else f"/{candidate}"):
attempt += 1
if stem:
candidate = f"{stem} ({attempt}){suffix}"
else:
candidate = f"file ({attempt}){suffix}" if suffix else f"file ({attempt})"
if base:
full_path = f"{base}/{candidate}"
else:
full_path = f"/{candidate}"
return full_path, candidate
async def _iter_file(path: Path, chunk_size: int, report_cb) -> AsyncIterator[bytes]:
async with aiofiles.open(path, "rb") as f:
while True:
chunk = await f.read(chunk_size)
if not chunk:
break
await report_cb(len(chunk))
yield chunk
async def run_http_download(task: Task):
params = task.task_info
url = params.get("url")
dest_dir = params.get("dest_dir")
filename = params.get("filename")
if not url or not dest_dir or not filename:
raise ValueError("Missing required parameters for offline download")
TEMP_ROOT.mkdir(parents=True, exist_ok=True)
temp_dir = TEMP_ROOT / task.id
temp_dir.mkdir(parents=True, exist_ok=True)
temp_file = temp_dir / "payload"
bytes_total: int | None = None
bytes_done = 0
last_update = time.monotonic()
await task_queue_service.update_progress(
task.id,
TaskProgress(
stage="downloading",
percent=0.0,
bytes_total=None,
bytes_done=0,
detail="HTTP downloading",
),
)
async def report_download(delta: int, total: int | None):
nonlocal bytes_done, bytes_total, last_update
if total is not None:
bytes_total = total
bytes_done += delta
now = time.monotonic()
if delta and now - last_update < 0.5:
return
last_update = now
percent = None
total_for_display = bytes_total if bytes_total is not None else None
if bytes_total:
percent = min(100.0, round(bytes_done / bytes_total * 100, 2))
await task_queue_service.update_progress(
task.id,
TaskProgress(
stage="downloading",
percent=percent,
bytes_total=total_for_display,
bytes_done=bytes_done,
detail="HTTP downloading",
),
)
timeout = aiohttp.ClientTimeout(total=None, connect=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as resp:
if resp.status != 200:
raise ValueError(f"HTTP {resp.status} for {url}")
content_length = resp.headers.get("Content-Length")
total_size = int(content_length) if content_length else None
bytes_done = 0
async with aiofiles.open(temp_file, "wb") as f:
async for chunk in resp.content.iter_chunked(512 * 1024):
if not chunk:
continue
await f.write(chunk)
await report_download(len(chunk), total_size)
# ensure final update
await report_download(0, total_size)
file_size = os.path.getsize(temp_file)
bytes_done_transfer = 0
async def report_transfer(delta: int):
nonlocal bytes_done_transfer
bytes_done_transfer += delta
percent = min(100.0, round(bytes_done_transfer / file_size * 100, 2)) if file_size else None
await task_queue_service.update_progress(
task.id,
TaskProgress(
stage="transferring",
percent=percent,
bytes_total=file_size or None,
bytes_done=bytes_done_transfer,
detail="Saving to storage",
),
)
async def chunk_iter() -> AsyncIterator[bytes]:
async for chunk in _iter_file(temp_file, 512 * 1024, report_transfer):
yield chunk
final_path, resolved_name = await _allocate_destination(dest_dir, filename)
await task_queue_service.update_progress(
task.id,
TaskProgress(stage="transferring", percent=0.0, bytes_total=file_size or None, bytes_done=0, detail="Saving to storage"),
)
await write_file_stream(final_path, chunk_iter())
await task_queue_service.update_progress(
task.id,
TaskProgress(stage="completed", percent=100.0, bytes_total=file_size or None, bytes_done=file_size, detail="Completed"),
)
await task_queue_service.update_meta(task.id, {"final_path": final_path, "filename": resolved_name})
try:
os.remove(temp_file)
temp_dir.rmdir()
except Exception:
await LogService.info("offline_download", f"Temp cleanup failed for task {task.id}")
return final_path

View File

@@ -13,6 +13,14 @@ class TaskStatus(str, Enum):
FAILED = "failed"
class TaskProgress(BaseModel):
stage: str | None = None
percent: float | None = None
bytes_total: int | None = None
bytes_done: int | None = None
detail: str | None = None
class Task(BaseModel):
id: str = Field(default_factory=lambda: uuid.uuid4().hex)
name: str
@@ -20,6 +28,8 @@ class Task(BaseModel):
result: Any = None
error: str | None = None
task_info: Dict[str, Any] = {}
progress: TaskProgress | None = None
meta: Dict[str, Any] | None = None
class TaskQueueService:
@@ -41,6 +51,21 @@ class TaskQueueService:
def get_all_tasks(self) -> list[Task]:
return list(self._tasks.values())
async def update_progress(self, task_id: str, progress: TaskProgress | Dict[str, Any]):
task = self._tasks.get(task_id)
if not task:
return
if isinstance(progress, TaskProgress):
task.progress = progress
else:
task.progress = TaskProgress(**progress)
async def update_meta(self, task_id: str, meta: Dict[str, Any]):
task = self._tasks.get(task_id)
if not task:
return
task.meta = (task.meta or {}) | meta
async def _execute_task(self, task: Task):
from services.virtual_fs import process_file
@@ -78,6 +103,11 @@ class TaskQueueService:
if save_to and getattr(processor, "produces_file", False):
await write_file(save_to, result)
task.result = "Automation task completed"
elif task.name == "offline_http_download":
from services.offline_download import run_http_download
result_path = await run_http_download(task)
task.result = {"path": result_path}
else:
raise ValueError(f"Unknown task name: {task.name}")

View File

@@ -73,4 +73,5 @@ async function request<T = any>(url: string, options: RequestOptions = {}): Prom
export { vfsApi, type VfsEntry, type DirListing } from './vfs';
export { adaptersApi, type AdapterItem, type AdapterTypeField, type AdapterTypeMeta } from './adapters';
export { shareApi, type ShareInfo, type ShareInfoWithPassword } from './share';
export { offlineDownloadsApi, type OfflineDownloadTask, type OfflineDownloadCreate, type TaskProgress } from './offlineDownloads';
export default request;

View File

@@ -0,0 +1,35 @@
import request from './client';
export interface TaskProgress {
stage?: string | null;
percent?: number | null;
bytes_total?: number | null;
bytes_done?: number | null;
detail?: string | null;
}
export interface OfflineDownloadTask {
id: string;
name: string;
status: 'pending' | 'running' | 'success' | 'failed';
result?: any;
error?: string | null;
task_info: Record<string, any>;
progress?: TaskProgress | null;
meta?: Record<string, any> | null;
}
export interface OfflineDownloadCreate {
url: string;
dest_dir: string;
filename: string;
}
export const offlineDownloadsApi = {
create: (payload: OfflineDownloadCreate) => request<{ task_id: string }>('/offline-downloads/', {
method: 'POST',
json: payload,
}),
list: () => request<OfflineDownloadTask[]>('/offline-downloads/'),
detail: (taskId: string) => request<OfflineDownloadTask>(`/offline-downloads/${taskId}`),
};

View File

@@ -1,4 +1,5 @@
import request from './client';
import type { TaskProgress } from './offlineDownloads';
export interface AutomationTask {
id: number;
@@ -21,6 +22,8 @@ export interface QueuedTask {
result?: any;
error?: string;
task_info: Record<string, any>;
progress?: TaskProgress | null;
meta?: Record<string, any> | null;
}
export const tasksApi = {
@@ -29,4 +32,4 @@ export const tasksApi = {
update: (id: number, payload: AutomationTaskUpdate) => request<AutomationTask>(`/tasks/${id}`, { method: 'PUT', json: payload }),
remove: (id: number) => request<void>(`/tasks/${id}`, { method: 'DELETE' }),
getQueue: () => request<QueuedTask[]>('/tasks/queue'),
};
};

View File

@@ -83,6 +83,27 @@ export const en = {
// Offline download
'No offline download tasks': 'No offline download tasks',
'Create Offline Download': 'Create Offline Download',
'Offline Download Tasks': 'Offline Download Tasks',
'URL': 'URL',
'Please input URL': 'Please input URL',
'Destination Folder': 'Destination Folder',
'Select destination': 'Select destination',
'Filename': 'Filename',
'Please input filename': 'Please input filename',
'Start Download': 'Start Download',
'Stage': 'Stage',
'Progress': 'Progress',
'Bytes': 'Bytes',
'Save Path': 'Save Path',
'Queued': 'Queued',
'Downloading': 'Downloading',
'Transferring': 'Transferring',
'Completed': 'Completed',
'Pending': 'Pending',
'Running': 'Running',
'Success': 'Success',
'Failed': 'Failed',
// Header/File Explorer
'Home': 'Home',
'File Manager': 'File Manager',
@@ -161,7 +182,6 @@ export const en = {
'Width': 'Width',
'Height': 'Height',
'No common EXIF info': 'No common EXIF info',
'Bytes': 'Bytes',
'File Properties': 'File Properties',
'Loading file info...': 'Loading file info...',
'Basic Info': 'Basic Info',

View File

@@ -11,6 +11,28 @@ export const zh = {
'Automation': '自动任务',
'My Shares': '我的分享',
'Offline Downloads': '离线下载',
'No offline download tasks': '暂无离线下载任务',
'Create Offline Download': '创建离线下载任务',
'Offline Download Tasks': '离线下载任务列表',
'URL': '下载地址',
'Please input URL': '请输入下载地址',
'Destination Folder': '保存目录',
'Select destination': '请选择保存目录',
'Filename': '文件名',
'Please input filename': '请输入文件名',
'Start Download': '开始下载',
'Stage': '阶段',
'Progress': '进度',
'Bytes': '已传输',
'Save Path': '保存路径',
'Queued': '排队中',
'Downloading': '下载中',
'Transferring': '转存中',
'Completed': '已完成',
'Pending': '等待',
'Running': '进行中',
'Success': '成功',
'Failed': '失败',
'Adapters': '存储挂载',
'Plugins': '应用中心',
'System Settings': '系统设置',
@@ -162,7 +184,6 @@ export const zh = {
'Width': '宽度',
'Height': '高度',
'No common EXIF info': '无常见EXIF信息',
'Bytes': '字节',
'File Properties': '文件属性',
'Loading file info...': '加载文件信息...',
'Basic Info': '基本信息',

View File

@@ -1,8 +1,234 @@
import { Empty } from 'antd';
import { memo, useCallback, useEffect, useMemo, useState } from 'react';
import { Button, Form, Input, Modal, message, Table, Tag, Typography, Space } from 'antd';
import type { TableColumnsType } from 'antd';
import { FolderOpenOutlined } from '@ant-design/icons';
import PathSelectorModal from '../components/PathSelectorModal';
import { offlineDownloadsApi, type OfflineDownloadTask } from '../api/client';
import { useI18n } from '../i18n';
import PageCard from '../components/PageCard';
export default function OfflineDownloadPage() {
const { t } = useI18n();
return <Empty description={t('No offline download tasks')} />;
interface TableRow extends OfflineDownloadTask {
key: string;
}
function formatBytes(bytes?: number | null): string {
if (bytes === undefined || bytes === null) return '--';
if (bytes === 0) return '0 B';
const units = ['B', 'KB', 'MB', 'GB', 'TB'];
const idx = Math.min(Math.floor(Math.log(bytes) / Math.log(1024)), units.length - 1);
const val = bytes / Math.pow(1024, idx);
return `${val.toFixed(idx === 0 ? 0 : 1)} ${units[idx]}`;
}
function stageLabel(t: (key: string) => string, stage?: string | null): string {
if (!stage) return '--';
const map: Record<string, string> = {
queued: t('Queued'),
downloading: t('Downloading'),
transferring: t('Transferring'),
completed: t('Completed'),
};
return map[stage] ?? stage;
}
function statusTag(status: OfflineDownloadTask['status'], t: (key: string) => string) {
switch (status) {
case 'success':
return <Tag color="green">{t('Success')}</Tag>;
case 'failed':
return <Tag color="red">{t('Failed')}</Tag>;
case 'running':
return <Tag color="blue">{t('Running')}</Tag>;
default:
return <Tag color="default">{t('Pending')}</Tag>;
}
}
const OfflineDownloadPage = memo(function OfflineDownloadPage() {
const { t } = useI18n();
const [form] = Form.useForm();
const [messageApi, contextHolder] = message.useMessage();
const [tasks, setTasks] = useState<TableRow[]>([]);
const [loading, setLoading] = useState(false);
const [submitting, setSubmitting] = useState(false);
const [pathModalOpen, setPathModalOpen] = useState(false);
const [createModalOpen, setCreateModalOpen] = useState(false);
const loadTasks = useCallback(async (withSpinner = false) => {
if (withSpinner) setLoading(true);
try {
const list = await offlineDownloadsApi.list();
setTasks(list.map(item => ({ ...item, key: item.id })));
} catch (err: any) {
messageApi.error(err?.message || t('Load failed'));
} finally {
if (withSpinner) setLoading(false);
}
}, [messageApi, t]);
useEffect(() => {
loadTasks(true);
const timer = window.setInterval(() => {
loadTasks().catch(() => {});
}, 3000);
return () => window.clearInterval(timer);
}, [loadTasks]);
const handleSelectFolder = useCallback((path: string) => {
form.setFieldsValue({ dest_dir: path });
setPathModalOpen(false);
}, [form]);
const handleSubmit = useCallback(async () => {
try {
const values = await form.validateFields();
setSubmitting(true);
const resp = await offlineDownloadsApi.create(values);
messageApi.success(`${t('Task submitted')}: ${resp.task_id}`);
form.resetFields();
await loadTasks(true);
setCreateModalOpen(false);
} catch (err: any) {
if (err?.errorFields) return;
messageApi.error(err?.message || t('Operation failed'));
} finally {
setSubmitting(false);
}
}, [form, loadTasks, messageApi, t]);
const columns: TableColumnsType<TableRow> = useMemo(() => [
{
title: t('Filename'),
dataIndex: ['meta', 'filename'],
render: (_: any, record) => record.meta?.filename || record.task_info?.filename || '--',
},
{
title: t('Stage'),
dataIndex: ['progress', 'stage'],
render: (_: any, record) => stageLabel(t, record.progress?.stage),
},
{
title: t('Progress'),
dataIndex: ['progress', 'percent'],
render: (_: any, record) => {
const percent = record.progress?.percent;
return percent !== undefined && percent !== null ? `${percent.toFixed(1)}%` : '--';
},
},
{
title: t('Bytes'),
render: (_: any, record) => {
const done = record.progress?.bytes_done;
const total = record.progress?.bytes_total;
if (done === undefined && total === undefined) return '--';
if (total) {
return `${formatBytes(done)} / ${formatBytes(total)}`;
}
return formatBytes(done);
},
},
{
title: t('Status'),
dataIndex: 'status',
render: (status: TableRow['status'], record) => (
<Space>
{statusTag(status, t)}
{status === 'failed' && record.error ? <Typography.Text type="danger">{record.error}</Typography.Text> : null}
</Space>
),
},
{
title: t('Save Path'),
dataIndex: ['meta', 'final_path'],
render: (value: string | undefined, record) => value || record.task_info?.dest_dir || '--',
},
], [t]);
return (
<>
{contextHolder}
<PageCard
title={t('Offline Downloads')}
extra={
<Button type="primary" onClick={() => { form.resetFields(); setCreateModalOpen(true); }}>
{t('Create Offline Download')}
</Button>
}
>
<Table
columns={columns}
dataSource={tasks}
loading={loading}
pagination={false}
locale={{ emptyText: t('No offline download tasks') }}
rowKey="id"
style={{ marginBottom: 0 }}
/>
</PageCard>
<Modal
title={t('Create Offline Download')}
open={createModalOpen}
onCancel={() => { setCreateModalOpen(false); form.resetFields(); }}
onOk={handleSubmit}
okText={t('Start Download')}
okButtonProps={{ loading: submitting }}
>
<Form form={form} layout="vertical">
<Form.Item name="url" label={t('URL')} rules={[{ required: true, message: t('Please input URL') }]}>
<Input placeholder="https://example.com/file" />
</Form.Item>
{(() => {
const errors = form.getFieldError('dest_dir');
const hasError = errors.length > 0;
return (
<Form.Item
label={t('Destination Folder')}
required
validateStatus={hasError ? 'error' : undefined}
help={hasError ? errors[0] : undefined}
>
<Input.Group compact style={{ display: 'flex' }}>
<Form.Item
name="dest_dir"
noStyle
rules={[{ required: true, message: t('Select destination') }]}
>
<Input
readOnly
placeholder={t('Select destination')}
style={{ width: 'calc(100% - 120px)', cursor: 'pointer' }}
onClick={() => setPathModalOpen(true)}
/>
</Form.Item>
<Button
type="default"
icon={<FolderOpenOutlined />}
onClick={() => setPathModalOpen(true)}
style={{ width: 120 }}
>
{t('Select Folder')}
</Button>
</Input.Group>
</Form.Item>
);
})()}
<Form.Item name="filename" label={t('Filename')} rules={[{ required: true, message: t('Please input filename') }]}>
<Input placeholder="example.zip" />
</Form.Item>
</Form>
</Modal>
<PathSelectorModal
open={pathModalOpen}
mode="directory"
initialPath={form.getFieldValue('dest_dir') || '/'}
onOk={handleSelectFolder}
onCancel={() => setPathModalOpen(false)}
/>
</>
);
});
export default OfflineDownloadPage;