From 00462f2259256692c2c89e10e45543dd502c79b5 Mon Sep 17 00:00:00 2001 From: shiyu Date: Wed, 24 Sep 2025 23:18:03 +0800 Subject: [PATCH] feat(Processors): add process directory endpoint and UI support --- api/routes/processors.py | 136 ++++++++++++++++++++++++++++- web/src/api/processors.ts | 12 +++ web/src/i18n/locales/zh.ts | 11 +++ web/src/pages/ProcessorsPage.tsx | 143 +++++++++++++++++++++++++------ 4 files changed, 273 insertions(+), 29 deletions(-) diff --git a/api/routes/processors.py b/api/routes/processors.py index 08604eb..e50bd1d 100644 --- a/api/routes/processors.py +++ b/api/routes/processors.py @@ -3,6 +3,8 @@ from fastapi import APIRouter, Depends, Body, HTTPException from fastapi.concurrency import run_in_threadpool from typing import Annotated from services.processors.registry import ( + get, + get_config_schema, get_config_schemas, get_module_path, reload_processors, @@ -11,7 +13,8 @@ from services.task_queue import task_queue_service from services.auth import get_current_active_user, User from api.response import success from pydantic import BaseModel -from services.virtual_fs import path_is_directory +from services.virtual_fs import path_is_directory, resolve_adapter_and_rel +from typing import List, Optional, Tuple router = APIRouter(prefix="/api/processors", tags=["processors"]) @@ -42,6 +45,15 @@ class ProcessRequest(BaseModel): overwrite: bool = False +class ProcessDirectoryRequest(BaseModel): + path: str + processor_type: str + config: dict + overwrite: bool = True + max_depth: Optional[int] = None + suffix: Optional[str] = None + + class UpdateSourceRequest(BaseModel): source: str @@ -69,6 +81,128 @@ async def process_file_with_processor( return success({"task_id": task.id}) +@router.post("/process-directory") +async def process_directory_with_processor( + current_user: Annotated[User, Depends(get_current_active_user)], + req: ProcessDirectoryRequest = Body(...) +): + if req.max_depth is not None and req.max_depth < 0: + raise HTTPException(400, detail="max_depth must be >= 0") + + is_dir = await path_is_directory(req.path) + if not is_dir: + raise HTTPException(400, detail="Path must be a directory") + + schema = get_config_schema(req.processor_type) + _processor = get(req.processor_type) + if not schema or not _processor: + raise HTTPException(404, detail="Processor not found") + + produces_file = bool(schema.get("produces_file")) + raw_suffix = req.suffix if req.suffix is not None else None + if raw_suffix is not None and raw_suffix.strip() == "": + raw_suffix = None + suffix = raw_suffix + overwrite = req.overwrite + + if produces_file: + if not overwrite and not suffix: + raise HTTPException(400, detail="Suffix is required when not overwriting files") + else: + overwrite = False + suffix = None + + supported_exts = schema.get("supported_exts") or [] + allowed_exts = { + ext.lower().lstrip('.') + for ext in supported_exts + if isinstance(ext, str) + } + + def matches_extension(file_rel: str) -> bool: + if not allowed_exts: + return True + if '.' not in file_rel: + return '' in allowed_exts + ext = file_rel.rsplit('.', 1)[-1].lower() + return ext in allowed_exts or f'.{ext}' in allowed_exts + + adapter_instance, adapter_model, root, rel = await resolve_adapter_and_rel(req.path) + rel = rel.rstrip('/') + + list_dir = getattr(adapter_instance, "list_dir", None) + if not callable(list_dir): + raise HTTPException(501, detail="Adapter does not implement list_dir") + + def build_absolute_path(mount_path: str, rel_path: str) -> str: + rel_norm = rel_path.lstrip('/') + mount_norm = mount_path.rstrip('/') + if not mount_norm: + return '/' + rel_norm if rel_norm else '/' + return f"{mount_norm}/{rel_norm}" if rel_norm else mount_norm + + def apply_suffix(path_str: str, suffix_str: str) -> str: + path_obj = Path(path_str) + name = path_obj.name + if not name: + return path_str + if '.' in name: + base, ext = name.rsplit('.', 1) + new_name = f"{base}{suffix_str}.{ext}" + else: + new_name = f"{name}{suffix_str}" + return str(path_obj.with_name(new_name)) + + scheduled_tasks: List[str] = [] + stack: List[Tuple[str, int]] = [(rel, 0)] + page_size = 200 + + while stack: + current_rel, depth = stack.pop() + page = 1 + while True: + entries, total = await list_dir(root, current_rel, page, page_size, "name", "asc") + entries = entries or [] + if not entries and (total or 0) == 0: + break + + for entry in entries: + name = entry.get("name") + if not name: + continue + child_rel = f"{current_rel}/{name}" if current_rel else name + if entry.get("is_dir"): + if req.max_depth is None or depth < req.max_depth: + stack.append((child_rel.rstrip('/'), depth + 1)) + continue + if not matches_extension(child_rel): + continue + absolute_path = build_absolute_path(adapter_model.path, child_rel) + save_to = None + if produces_file and not overwrite and suffix: + save_to = apply_suffix(absolute_path, suffix) + task = await task_queue_service.add_task( + "process_file", + { + "path": absolute_path, + "processor_type": req.processor_type, + "config": req.config, + "save_to": save_to, + "overwrite": overwrite, + }, + ) + scheduled_tasks.append(task.id) + + if total is None or page * page_size >= total: + break + page += 1 + + return success({ + "task_ids": scheduled_tasks, + "scheduled": len(scheduled_tasks), + }) + + @router.get("/source/{processor_type}") async def get_processor_source( processor_type: str, diff --git a/web/src/api/processors.ts b/web/src/api/processors.ts index 012426f..2575a62 100644 --- a/web/src/api/processors.ts +++ b/web/src/api/processors.ts @@ -34,6 +34,18 @@ export const processorsApi = { method: 'POST', json: params, }), + processDirectory: (params: { + path: string; + processor_type: string; + config: any; + overwrite: boolean; + max_depth?: number | null; + suffix?: string | null; + }) => + request<{ task_ids: string[]; scheduled: number }>('/processors/process-directory', { + method: 'POST', + json: params, + }), getSource: (type: string) => request<{ source: string; module_path: string }>('/processors/source/' + encodeURIComponent(type), { method: 'GET', diff --git a/web/src/i18n/locales/zh.ts b/web/src/i18n/locales/zh.ts index 6900d1b..f1fc28a 100644 --- a/web/src/i18n/locales/zh.ts +++ b/web/src/i18n/locales/zh.ts @@ -418,6 +418,17 @@ export const zh = { 'Source Editor': '源码编辑', 'Module Path': '模块路径', 'Directory processing always overwrites original files': '选择目录时会强制覆盖原文件', + 'Directory execution will enqueue one task per file': '目录模式会为每个文件单独创建任务', + 'Directory scope': '目录范围', + 'Current level only': '仅当前层级', + 'Include subdirectories': '包含子目录', + 'Max depth': '最大层级', + 'Leave empty to traverse all subdirectories': '留空表示遍历所有子目录', + 'Depth must be greater or equal to 0': '层级必须大于或等于 0', + 'Output suffix': '输出后缀', + 'Suffix will be inserted before the file extension, e.g. demo_processed.mp4': '后缀会插入到文件扩展名前,例如 demo_processed.mp4', + 'Suffix such as _processed': '例如 _processed 的后缀', + 'Suffix cannot be empty': '后缀不能为空', 'No data': '暂无数据', // Path selector diff --git a/web/src/pages/ProcessorsPage.tsx b/web/src/pages/ProcessorsPage.tsx index 63d8e07..1a6ed4b 100644 --- a/web/src/pages/ProcessorsPage.tsx +++ b/web/src/pages/ProcessorsPage.tsx @@ -1,13 +1,16 @@ import { memo, useCallback, useEffect, useMemo, useState } from 'react'; import { Button, + Alert, Card, Empty, Flex, Form, Input, + InputNumber, message, Modal, + Segmented, Space, Spin, Switch, @@ -134,23 +137,36 @@ const ProcessorsPage = memo(function ProcessorsPage() { overwrite: !!selectedProcessorMeta.produces_file, save_to: undefined, config: defaults, + directory_scope: 'current', + max_depth: undefined, + suffix: undefined, }); setIsDirectory(false); }, [selectedProcessorMeta, form]); - const overwriteValue = Form.useWatch('overwrite', form) ?? false; + const producesFile = selectedProcessorMeta?.produces_file ?? false; + const overwriteWatch = Form.useWatch('overwrite', form); + const overwriteValue = producesFile ? !!overwriteWatch : false; + const directoryScope = Form.useWatch('directory_scope', form) ?? 'current'; useEffect(() => { if (overwriteValue) { - form.setFieldsValue({ save_to: undefined }); + form.setFieldsValue({ save_to: undefined, suffix: undefined }); } }, [overwriteValue, form]); useEffect(() => { if (isDirectory) { - form.setFieldsValue({ overwrite: true, save_to: undefined }); + form.setFieldsValue({ + overwrite: producesFile ? true : false, + save_to: undefined, + directory_scope: 'current', + max_depth: undefined, + }); + } else { + form.setFieldsValue({ suffix: undefined }); } - }, [isDirectory, form]); + }, [isDirectory, form, producesFile]); const handleSelectProcessor = useCallback((type: string) => { if (type === selectedType) return; @@ -232,17 +248,38 @@ const ProcessorsPage = memo(function ProcessorsPage() { } }); setRunning(true); - const payload: any = { - path: values.path, - processor_type: selectedType, - config: finalConfig, - overwrite: !!values.overwrite, - }; - if (values.save_to && !values.overwrite) { - payload.save_to = values.save_to; + const overwriteFlag = producesFile ? !!values.overwrite : false; + if (isDirectory) { + const scope: 'current' | 'recursive' = values.directory_scope || 'current'; + let maxDepth: number | null = scope === 'current' ? 0 : null; + if (scope === 'recursive' && typeof values.max_depth === 'number') { + maxDepth = values.max_depth; + } + const suffixValue = producesFile && !overwriteFlag && typeof values.suffix === 'string' + ? values.suffix.trim() || null + : null; + const resp = await processorsApi.processDirectory({ + path: values.path, + processor_type: selectedType, + config: finalConfig, + overwrite: overwriteFlag, + max_depth: maxDepth, + suffix: suffixValue, + }); + messageApi.success(`${t('Task submitted')}: ${resp.scheduled}`); + } else { + const payload: any = { + path: values.path, + processor_type: selectedType, + config: finalConfig, + overwrite: overwriteFlag, + }; + if (values.save_to && !overwriteFlag) { + payload.save_to = values.save_to; + } + const resp = await processorsApi.process(payload); + messageApi.success(`${t('Task submitted')}: ${resp.task_id}`); } - const resp = await processorsApi.process(payload); - messageApi.success(`${t('Task submitted')}: ${resp.task_id}`); } catch (err: any) { if (err?.errorFields) { return; @@ -251,7 +288,7 @@ const ProcessorsPage = memo(function ProcessorsPage() { } finally { setRunning(false); } - }, [form, messageApi, selectedProcessorMeta, selectedType, t]); + }, [form, isDirectory, messageApi, producesFile, selectedProcessorMeta, selectedType, t]); const selectedConfigPath = pathModalField === 'path' ? (selectedType ? form.getFieldValue('path') : undefined) || '/' @@ -379,11 +416,6 @@ const ProcessorsPage = memo(function ProcessorsPage() {
{selectedType ? ( <> - {isDirectory && ( - - {t('Directory processing always overwrites original files')} - - )} openPathSelector('path', 'directory')}>{t('Select Directory')} + {isDirectory && ( + + + + + + {directoryScope === 'recursive' && ( + { + if (value === undefined || value === null) return; + if (value < 0) throw new Error(t('Depth must be greater or equal to 0')); + }, + }]} + > + + + )} + + )} - - - + {producesFile && ( + + + + )} - {selectedProcessorMeta?.produces_file && !overwriteValue && ( + {isDirectory && producesFile && !overwriteValue && ( + { + if (typeof value !== 'string') return; + if (!value.trim()) { + throw new Error(t('Suffix cannot be empty')); + } + }, + }, + ]} + extra={t('Suffix will be inserted before the file extension, e.g. demo_processed.mp4')} + > + + + )} + + {!isDirectory && producesFile && !overwriteValue && (