Files
Foxel/domain/virtual_fs/processing.py
2026-02-09 13:19:28 +08:00

114 lines
4.5 KiB
Python

from typing import Any
from fastapi import HTTPException
from fastapi.responses import Response
from .transfer import VirtualFSTransferMixin
class VirtualFSProcessingMixin(VirtualFSTransferMixin):
@classmethod
async def process_file(
cls,
path: str,
processor_type: str,
config: dict,
save_to: str | None = None,
overwrite: bool = False,
) -> Any:
from domain.processors import get_processor
processor = get_processor(processor_type)
if not processor:
raise HTTPException(400, detail=f"Processor {processor_type} not found")
actual_is_dir = await cls.path_is_directory(path)
requires_input_bytes = bool(getattr(processor, "requires_input_bytes", True))
if actual_is_dir and bool(getattr(processor, "supports_directory", False)):
if save_to:
raise HTTPException(400, detail="Directory processing does not support custom save_to path")
return await processor.process(b"", path, config)
supported_exts = getattr(processor, "supported_exts", None) or []
allowed_exts = {str(ext).lower().lstrip(".") for ext in supported_exts if isinstance(ext, str)}
def matches_extension(rel_path: str) -> bool:
if not allowed_exts:
return True
if "." not in rel_path:
return "" in allowed_exts
ext = rel_path.rsplit(".", 1)[-1].lower()
return ext in allowed_exts or f".{ext}" in allowed_exts
def coerce_result_bytes(result: Any) -> bytes:
if isinstance(result, Response):
return result.body
if isinstance(result, (bytes, bytearray)):
return bytes(result)
if isinstance(result, str):
return result.encode("utf-8")
raise HTTPException(500, detail="Processor must return bytes/Response when produces_file=True")
if actual_is_dir:
if save_to:
raise HTTPException(400, detail="Directory processing does not support custom save_to path")
if not overwrite:
raise HTTPException(400, detail="Directory processing requires overwrite")
adapter_instance, adapter_model, root, rel = await cls.resolve_adapter_and_rel(path)
rel = rel.rstrip("/")
list_dir = await cls._ensure_method(adapter_instance, "list_dir")
processed_count = 0
stack: list[str] = [rel]
page_size = 200
while stack:
current = stack.pop()
page = 1
while True:
entries, total = await list_dir(root, current, page, page_size, "name", "asc")
if not entries and (total or 0) == 0:
break
for entry in entries:
name = entry.get("name")
if not name:
continue
child_rel = f"{current}/{name}" if current else name
if entry.get("is_dir"):
stack.append(child_rel)
continue
if not matches_extension(child_rel):
continue
absolute_path = cls._build_absolute_path(adapter_model.path, child_rel)
data = b""
if requires_input_bytes:
data = await cls.read_file(absolute_path)
result = await processor.process(data, absolute_path, config)
if getattr(processor, "produces_file", False):
result_bytes = coerce_result_bytes(result)
await cls.write_file(absolute_path, result_bytes)
processed_count += 1
if total is None or page * page_size >= total:
break
page += 1
return {"processed_files": processed_count}
data = b""
if requires_input_bytes:
data = await cls.read_file(path)
result = await processor.process(data, path, config)
target_path = save_to
if overwrite and not target_path:
target_path = path
if target_path and getattr(processor, "produces_file", False):
result_bytes = coerce_result_bytes(result)
await cls.write_file(target_path, result_bytes)
return {"saved_to": target_path}
return result