diff --git a/domain/processors/builtin/vector_index.py b/domain/processors/builtin/vector_index.py index edc9e0c..3a7fb26 100644 --- a/domain/processors/builtin/vector_index.py +++ b/domain/processors/builtin/vector_index.py @@ -114,8 +114,15 @@ class VectorIndexProcessor: } ] produces_file = False + requires_input_bytes = False async def process(self, input_bytes: bytes, path: str, config: Dict[str, Any]) -> Response: + async def ensure_input_bytes() -> bytes: + if input_bytes: + return input_bytes + from domain.virtual_fs import VirtualFSService + return await VirtualFSService.read_file(path) + action = config.get("action", "create") index_type = config.get("index_type", "vector") vector_db = VectorDBService() @@ -159,7 +166,8 @@ class VectorIndexProcessor: await vector_db.delete_vector(vector_collection, path) if file_ext in ["jpg", "jpeg", "png", "bmp"]: - processed_bytes, compression = _compress_image_for_embedding(input_bytes) + file_bytes = await ensure_input_bytes() + processed_bytes, compression = _compress_image_for_embedding(file_bytes) base64_image = base64.b64encode(processed_bytes).decode("utf-8") description = await describe_image_base64(base64_image) embedding = await get_text_embedding(description) @@ -180,7 +188,8 @@ class VectorIndexProcessor: if file_ext in ["txt", "md"]: try: - text = input_bytes.decode("utf-8") + file_bytes = await ensure_input_bytes() + text = file_bytes.decode("utf-8") except UnicodeDecodeError: return Response(content="文本文件解码失败", status_code=400) diff --git a/domain/processors/service.py b/domain/processors/service.py index 2305496..a3061b7 100644 --- a/domain/processors/service.py +++ b/domain/processors/service.py @@ -85,6 +85,44 @@ class ProcessorService: suffix = raw_suffix overwrite = req.overwrite + if produces_file: + if not overwrite and not suffix: + raise HTTPException(400, detail="Suffix is required when not overwriting files") + else: + overwrite = False + suffix = None + payload = { + "path": req.path, + "processor_type": req.processor_type, + "config": req.config, + "overwrite": overwrite, + "max_depth": req.max_depth, + "suffix": suffix, + } + task = await task_queue_service.add_task("process_directory_scan", payload) + return {"task_id": task.id} + + @classmethod + async def scan_directory(cls, req: ProcessDirectoryRequest): + if req.max_depth is not None and req.max_depth < 0: + raise HTTPException(400, detail="max_depth must be >= 0") + + is_dir = await VirtualFSService.path_is_directory(req.path) + if not is_dir: + raise HTTPException(400, detail="Path must be a directory") + + schema = get_config_schema(req.processor_type) + _processor = get(req.processor_type) + if not schema or not _processor: + raise HTTPException(404, detail="Processor not found") + + produces_file = bool(schema.get("produces_file")) + raw_suffix = req.suffix if req.suffix is not None else None + if raw_suffix is not None and raw_suffix.strip() == "": + raw_suffix = None + suffix = raw_suffix + overwrite = req.overwrite + if produces_file: if not overwrite and not suffix: raise HTTPException(400, detail="Suffix is required when not overwriting files") @@ -133,7 +171,7 @@ class ProcessorService: new_name = f"{name}{suffix_str}" return str(path_obj.with_name(new_name)) - scheduled_tasks: List[str] = [] + scheduled_count = 0 stack: List[Tuple[str, int]] = [(rel, 0)] page_size = 200 @@ -161,7 +199,7 @@ class ProcessorService: save_to = None if produces_file and not overwrite and suffix: save_to = apply_suffix(absolute_path, suffix) - task = await task_queue_service.add_task( + await task_queue_service.add_task( "process_file", { "path": absolute_path, @@ -171,16 +209,13 @@ class ProcessorService: "overwrite": overwrite, }, ) - scheduled_tasks.append(task.id) + scheduled_count += 1 if total is None or page * page_size >= total: break page += 1 - return { - "task_ids": scheduled_tasks, - "scheduled": len(scheduled_tasks), - } + return {"scheduled": scheduled_count} @classmethod async def get_source(cls, processor_type: str): diff --git a/domain/tasks/task_queue.py b/domain/tasks/task_queue.py index e9447f4..9d36886 100644 --- a/domain/tasks/task_queue.py +++ b/domain/tasks/task_queue.py @@ -86,6 +86,12 @@ class TaskQueueService: overwrite=params.get("overwrite", False), ) task.result = result + elif task.name == "process_directory_scan": + from domain.processors import ProcessDirectoryRequest, ProcessorService + + params = task.task_info or {} + req = ProcessDirectoryRequest(**params) + task.result = await ProcessorService.scan_directory(req) elif task.name == "automation_task" or self._is_processor_task(task.name): from models.database import AutomationTask diff --git a/web/src/api/processors.ts b/web/src/api/processors.ts index 578cf2d..32bddc6 100644 --- a/web/src/api/processors.ts +++ b/web/src/api/processors.ts @@ -43,7 +43,7 @@ export const processorsApi = { max_depth?: number | null; suffix?: string | null; }) => - request<{ task_ids: string[]; scheduled: number }>('/processors/process-directory', { + request<{ task_id: string }>('/processors/process-directory', { method: 'POST', json: params, }), diff --git a/web/src/i18n/locales/en.json b/web/src/i18n/locales/en.json index fd2e3be..f409915 100644 --- a/web/src/i18n/locales/en.json +++ b/web/src/i18n/locales/en.json @@ -617,6 +617,7 @@ "Source Editor": "Source Editor", "Module Path": "Module Path", "Directory processing always overwrites original files": "Directory processing always overwrites original files", + "Directory execution will enqueue one task per file": "Directory execution will enqueue a scan task, then one task per file", "No data": "No data", "Select File": "Select File", "Select Path": "Select Path", diff --git a/web/src/i18n/locales/zh.json b/web/src/i18n/locales/zh.json index c87acaf..357888f 100644 --- a/web/src/i18n/locales/zh.json +++ b/web/src/i18n/locales/zh.json @@ -608,7 +608,7 @@ "Source Editor": "源码编辑", "Module Path": "模块路径", "Directory processing always overwrites original files": "选择目录时会强制覆盖原文件", - "Directory execution will enqueue one task per file": "目录模式会为每个文件单独创建任务", + "Directory execution will enqueue one task per file": "目录模式会先创建扫描任务,后台再为每个文件创建任务", "Directory scope": "目录范围", "Current level only": "仅当前层级", "Include subdirectories": "包含子目录", diff --git a/web/src/pages/ProcessorsPage.tsx b/web/src/pages/ProcessorsPage.tsx index 2c18532..a7c9091 100644 --- a/web/src/pages/ProcessorsPage.tsx +++ b/web/src/pages/ProcessorsPage.tsx @@ -276,7 +276,7 @@ const ProcessorsPage = memo(function ProcessorsPage() { max_depth: maxDepth, suffix: suffixValue, }); - messageApi.success(`${t('Task submitted')}: ${resp.scheduled}`); + messageApi.success(`${t('Task submitted')}: ${resp.task_id}`); } } else { const payload: any = {