feat(Processors): add process directory endpoint and UI support

This commit is contained in:
shiyu
2025-09-24 23:18:03 +08:00
parent f0892ebcd6
commit 00462f2259
4 changed files with 273 additions and 29 deletions

View File

@@ -3,6 +3,8 @@ from fastapi import APIRouter, Depends, Body, HTTPException
from fastapi.concurrency import run_in_threadpool
from typing import Annotated
from services.processors.registry import (
get,
get_config_schema,
get_config_schemas,
get_module_path,
reload_processors,
@@ -11,7 +13,8 @@ from services.task_queue import task_queue_service
from services.auth import get_current_active_user, User
from api.response import success
from pydantic import BaseModel
from services.virtual_fs import path_is_directory
from services.virtual_fs import path_is_directory, resolve_adapter_and_rel
from typing import List, Optional, Tuple
router = APIRouter(prefix="/api/processors", tags=["processors"])
@@ -42,6 +45,15 @@ class ProcessRequest(BaseModel):
overwrite: bool = False
class ProcessDirectoryRequest(BaseModel):
path: str
processor_type: str
config: dict
overwrite: bool = True
max_depth: Optional[int] = None
suffix: Optional[str] = None
class UpdateSourceRequest(BaseModel):
source: str
@@ -69,6 +81,128 @@ async def process_file_with_processor(
return success({"task_id": task.id})
@router.post("/process-directory")
async def process_directory_with_processor(
current_user: Annotated[User, Depends(get_current_active_user)],
req: ProcessDirectoryRequest = Body(...)
):
if req.max_depth is not None and req.max_depth < 0:
raise HTTPException(400, detail="max_depth must be >= 0")
is_dir = await path_is_directory(req.path)
if not is_dir:
raise HTTPException(400, detail="Path must be a directory")
schema = get_config_schema(req.processor_type)
_processor = get(req.processor_type)
if not schema or not _processor:
raise HTTPException(404, detail="Processor not found")
produces_file = bool(schema.get("produces_file"))
raw_suffix = req.suffix if req.suffix is not None else None
if raw_suffix is not None and raw_suffix.strip() == "":
raw_suffix = None
suffix = raw_suffix
overwrite = req.overwrite
if produces_file:
if not overwrite and not suffix:
raise HTTPException(400, detail="Suffix is required when not overwriting files")
else:
overwrite = False
suffix = None
supported_exts = schema.get("supported_exts") or []
allowed_exts = {
ext.lower().lstrip('.')
for ext in supported_exts
if isinstance(ext, str)
}
def matches_extension(file_rel: str) -> bool:
if not allowed_exts:
return True
if '.' not in file_rel:
return '' in allowed_exts
ext = file_rel.rsplit('.', 1)[-1].lower()
return ext in allowed_exts or f'.{ext}' in allowed_exts
adapter_instance, adapter_model, root, rel = await resolve_adapter_and_rel(req.path)
rel = rel.rstrip('/')
list_dir = getattr(adapter_instance, "list_dir", None)
if not callable(list_dir):
raise HTTPException(501, detail="Adapter does not implement list_dir")
def build_absolute_path(mount_path: str, rel_path: str) -> str:
rel_norm = rel_path.lstrip('/')
mount_norm = mount_path.rstrip('/')
if not mount_norm:
return '/' + rel_norm if rel_norm else '/'
return f"{mount_norm}/{rel_norm}" if rel_norm else mount_norm
def apply_suffix(path_str: str, suffix_str: str) -> str:
path_obj = Path(path_str)
name = path_obj.name
if not name:
return path_str
if '.' in name:
base, ext = name.rsplit('.', 1)
new_name = f"{base}{suffix_str}.{ext}"
else:
new_name = f"{name}{suffix_str}"
return str(path_obj.with_name(new_name))
scheduled_tasks: List[str] = []
stack: List[Tuple[str, int]] = [(rel, 0)]
page_size = 200
while stack:
current_rel, depth = stack.pop()
page = 1
while True:
entries, total = await list_dir(root, current_rel, page, page_size, "name", "asc")
entries = entries or []
if not entries and (total or 0) == 0:
break
for entry in entries:
name = entry.get("name")
if not name:
continue
child_rel = f"{current_rel}/{name}" if current_rel else name
if entry.get("is_dir"):
if req.max_depth is None or depth < req.max_depth:
stack.append((child_rel.rstrip('/'), depth + 1))
continue
if not matches_extension(child_rel):
continue
absolute_path = build_absolute_path(adapter_model.path, child_rel)
save_to = None
if produces_file and not overwrite and suffix:
save_to = apply_suffix(absolute_path, suffix)
task = await task_queue_service.add_task(
"process_file",
{
"path": absolute_path,
"processor_type": req.processor_type,
"config": req.config,
"save_to": save_to,
"overwrite": overwrite,
},
)
scheduled_tasks.append(task.id)
if total is None or page * page_size >= total:
break
page += 1
return success({
"task_ids": scheduled_tasks,
"scheduled": len(scheduled_tasks),
})
@router.get("/source/{processor_type}")
async def get_processor_source(
processor_type: str,

View File

@@ -34,6 +34,18 @@ export const processorsApi = {
method: 'POST',
json: params,
}),
processDirectory: (params: {
path: string;
processor_type: string;
config: any;
overwrite: boolean;
max_depth?: number | null;
suffix?: string | null;
}) =>
request<{ task_ids: string[]; scheduled: number }>('/processors/process-directory', {
method: 'POST',
json: params,
}),
getSource: (type: string) =>
request<{ source: string; module_path: string }>('/processors/source/' + encodeURIComponent(type), {
method: 'GET',

View File

@@ -418,6 +418,17 @@ export const zh = {
'Source Editor': '源码编辑',
'Module Path': '模块路径',
'Directory processing always overwrites original files': '选择目录时会强制覆盖原文件',
'Directory execution will enqueue one task per file': '目录模式会为每个文件单独创建任务',
'Directory scope': '目录范围',
'Current level only': '仅当前层级',
'Include subdirectories': '包含子目录',
'Max depth': '最大层级',
'Leave empty to traverse all subdirectories': '留空表示遍历所有子目录',
'Depth must be greater or equal to 0': '层级必须大于或等于 0',
'Output suffix': '输出后缀',
'Suffix will be inserted before the file extension, e.g. demo_processed.mp4': '后缀会插入到文件扩展名前,例如 demo_processed.mp4',
'Suffix such as _processed': '例如 _processed 的后缀',
'Suffix cannot be empty': '后缀不能为空',
'No data': '暂无数据',
// Path selector

View File

@@ -1,13 +1,16 @@
import { memo, useCallback, useEffect, useMemo, useState } from 'react';
import {
Button,
Alert,
Card,
Empty,
Flex,
Form,
Input,
InputNumber,
message,
Modal,
Segmented,
Space,
Spin,
Switch,
@@ -134,23 +137,36 @@ const ProcessorsPage = memo(function ProcessorsPage() {
overwrite: !!selectedProcessorMeta.produces_file,
save_to: undefined,
config: defaults,
directory_scope: 'current',
max_depth: undefined,
suffix: undefined,
});
setIsDirectory(false);
}, [selectedProcessorMeta, form]);
const overwriteValue = Form.useWatch('overwrite', form) ?? false;
const producesFile = selectedProcessorMeta?.produces_file ?? false;
const overwriteWatch = Form.useWatch('overwrite', form);
const overwriteValue = producesFile ? !!overwriteWatch : false;
const directoryScope = Form.useWatch('directory_scope', form) ?? 'current';
useEffect(() => {
if (overwriteValue) {
form.setFieldsValue({ save_to: undefined });
form.setFieldsValue({ save_to: undefined, suffix: undefined });
}
}, [overwriteValue, form]);
useEffect(() => {
if (isDirectory) {
form.setFieldsValue({ overwrite: true, save_to: undefined });
form.setFieldsValue({
overwrite: producesFile ? true : false,
save_to: undefined,
directory_scope: 'current',
max_depth: undefined,
});
} else {
form.setFieldsValue({ suffix: undefined });
}
}, [isDirectory, form]);
}, [isDirectory, form, producesFile]);
const handleSelectProcessor = useCallback((type: string) => {
if (type === selectedType) return;
@@ -232,17 +248,38 @@ const ProcessorsPage = memo(function ProcessorsPage() {
}
});
setRunning(true);
const payload: any = {
path: values.path,
processor_type: selectedType,
config: finalConfig,
overwrite: !!values.overwrite,
};
if (values.save_to && !values.overwrite) {
payload.save_to = values.save_to;
const overwriteFlag = producesFile ? !!values.overwrite : false;
if (isDirectory) {
const scope: 'current' | 'recursive' = values.directory_scope || 'current';
let maxDepth: number | null = scope === 'current' ? 0 : null;
if (scope === 'recursive' && typeof values.max_depth === 'number') {
maxDepth = values.max_depth;
}
const suffixValue = producesFile && !overwriteFlag && typeof values.suffix === 'string'
? values.suffix.trim() || null
: null;
const resp = await processorsApi.processDirectory({
path: values.path,
processor_type: selectedType,
config: finalConfig,
overwrite: overwriteFlag,
max_depth: maxDepth,
suffix: suffixValue,
});
messageApi.success(`${t('Task submitted')}: ${resp.scheduled}`);
} else {
const payload: any = {
path: values.path,
processor_type: selectedType,
config: finalConfig,
overwrite: overwriteFlag,
};
if (values.save_to && !overwriteFlag) {
payload.save_to = values.save_to;
}
const resp = await processorsApi.process(payload);
messageApi.success(`${t('Task submitted')}: ${resp.task_id}`);
}
const resp = await processorsApi.process(payload);
messageApi.success(`${t('Task submitted')}: ${resp.task_id}`);
} catch (err: any) {
if (err?.errorFields) {
return;
@@ -251,7 +288,7 @@ const ProcessorsPage = memo(function ProcessorsPage() {
} finally {
setRunning(false);
}
}, [form, messageApi, selectedProcessorMeta, selectedType, t]);
}, [form, isDirectory, messageApi, producesFile, selectedProcessorMeta, selectedType, t]);
const selectedConfigPath = pathModalField === 'path'
? (selectedType ? form.getFieldValue('path') : undefined) || '/'
@@ -379,11 +416,6 @@ const ProcessorsPage = memo(function ProcessorsPage() {
<Form form={form} layout="vertical" disabled={!selectedType} style={{ padding: '12px 0' }}>
{selectedType ? (
<>
{isDirectory && (
<Text type="secondary" style={{ display: 'block', marginBottom: 12 }}>
{t('Directory processing always overwrites original files')}
</Text>
)}
<Form.Item
label={t('Target Path')}
required
@@ -402,16 +434,71 @@ const ProcessorsPage = memo(function ProcessorsPage() {
<Button onClick={() => openPathSelector('path', 'directory')}>{t('Select Directory')}</Button>
</Flex>
</Form.Item>
{isDirectory && (
<Space direction="vertical" size={12} style={{ width: '100%', marginBottom: 12 }}>
<Alert
type="info"
showIcon
message={t('Directory execution will enqueue one task per file')}
/>
<Form.Item name="directory_scope" label={t('Directory scope')} initialValue="current">
<Segmented
options={[
{ label: t('Current level only'), value: 'current' },
{ label: t('Include subdirectories'), value: 'recursive' },
]}
/>
</Form.Item>
{directoryScope === 'recursive' && (
<Form.Item
name="max_depth"
label={t('Max depth')}
extra={t('Leave empty to traverse all subdirectories')}
rules={[{
validator: async (_: any, value: number | null) => {
if (value === undefined || value === null) return;
if (value < 0) throw new Error(t('Depth must be greater or equal to 0'));
},
}]}
>
<InputNumber min={0} placeholder={t('Unlimited')} style={{ width: '100%' }} />
</Form.Item>
)}
</Space>
)}
<Form.Item
name="overwrite"
label={t('Overwrite original')}
valuePropName="checked"
>
<Switch disabled={isDirectory} />
</Form.Item>
{producesFile && (
<Form.Item
name="overwrite"
label={t('Overwrite original')}
valuePropName="checked"
>
<Switch />
</Form.Item>
)}
{selectedProcessorMeta?.produces_file && !overwriteValue && (
{isDirectory && producesFile && !overwriteValue && (
<Form.Item
name="suffix"
label={t('Output suffix')}
rules={[
{ required: true, message: t('Please input a suffix') },
{
validator: async (_: any, value: string) => {
if (typeof value !== 'string') return;
if (!value.trim()) {
throw new Error(t('Suffix cannot be empty'));
}
},
},
]}
extra={t('Suffix will be inserted before the file extension, e.g. demo_processed.mp4')}
>
<Input placeholder={t('Suffix such as _processed')} />
</Form.Item>
)}
{!isDirectory && producesFile && !overwriteValue && (
<Form.Item label={t('Save To')}>
<Flex gap={8} align="center">
<div style={{ flex: 1 }}>