diff --git a/api/routes/processors.py b/api/routes/processors.py index 3ea06b1..555956a 100644 --- a/api/routes/processors.py +++ b/api/routes/processors.py @@ -1,7 +1,7 @@ from fastapi import APIRouter, Depends, Body from typing import Annotated from services.processors.registry import get_config_schemas -from services.virtual_fs import process_file +from services.task_queue import task_queue_service from services.auth import get_current_active_user, User from api.response import success from pydantic import BaseModel @@ -21,7 +21,7 @@ async def list_processors( "name": meta["name"], "supported_exts": meta.get("supported_exts", []), "config_schema": meta["config_schema"], - "produces_file": meta.get("produces_file", False), + "produces_file": meta.get("produces_file", False), }) return success(out) @@ -40,5 +40,13 @@ async def process_file_with_processor( req: ProcessRequest = Body(...) ): save_to = req.path if req.overwrite else req.save_to - result = await process_file(req.path, req.processor_type, req.config, save_to) - return success(result) + task = await task_queue_service.add_task( + "process_file", + { + "path": req.path, + "processor_type": req.processor_type, + "config": req.config, + "save_to": save_to, + }, + ) + return success({"task_id": task.id}) diff --git a/api/routes/tasks.py b/api/routes/tasks.py index 69fac73..4640387 100644 --- a/api/routes/tasks.py +++ b/api/routes/tasks.py @@ -6,6 +6,7 @@ from schemas.tasks import AutomationTaskCreate, AutomationTaskUpdate from api.response import success from services.auth import get_current_active_user, User from services.logging import LogService +from services.task_queue import task_queue_service router = APIRouter( prefix="/api/tasks", @@ -15,6 +16,25 @@ router = APIRouter( ) +@router.get("/queue") +async def get_task_queue_status( + current_user: Annotated[User, Depends(get_current_active_user)], +): + tasks = task_queue_service.get_all_tasks() + return success([task.dict() for task in tasks]) + + +@router.get("/queue/{task_id}") +async def get_task_status( + task_id: str, + current_user: Annotated[User, Depends(get_current_active_user)], +): + task = task_queue_service.get_task(task_id) + if not task: + raise HTTPException(status_code=404, detail="Task not found") + return success(task.dict()) + + @router.post("/") async def create_task( task_in: AutomationTaskCreate, diff --git a/main.py b/main.py index 1034e79..bf279a9 100644 --- a/main.py +++ b/main.py @@ -8,6 +8,7 @@ from fastapi import FastAPI from services.middleware.logging_middleware import LoggingMiddleware from services.middleware.exception_handler import global_exception_handler from dotenv import load_dotenv +from services.task_queue import task_queue_service load_dotenv() @@ -17,9 +18,11 @@ async def lifespan(app: FastAPI): await init_db() await runtime_registry.refresh() await ConfigCenter.set("APP_VERSION", VERSION) + task_queue_service.start_worker() try: yield finally: + await task_queue_service.stop_worker() await close_db() diff --git a/services/task_queue.py b/services/task_queue.py new file mode 100644 index 0000000..df58679 --- /dev/null +++ b/services/task_queue.py @@ -0,0 +1,122 @@ +import asyncio +from typing import Dict, Any +from pydantic import BaseModel, Field +import uuid +from services.logging import LogService +from enum import Enum + + +class TaskStatus(str, Enum): + PENDING = "pending" + RUNNING = "running" + SUCCESS = "success" + FAILED = "failed" + + +class Task(BaseModel): + id: str = Field(default_factory=lambda: uuid.uuid4().hex) + name: str + status: TaskStatus = TaskStatus.PENDING + result: Any = None + error: str | None = None + task_info: Dict[str, Any] = {} + + +class TaskQueueService: + def __init__(self): + self._queue = asyncio.Queue() + self._tasks: Dict[str, Task] = {} + self._worker_task: asyncio.Task | None = None + + async def add_task(self, name: str, task_info: Dict[str, Any]) -> Task: + task = Task(name=name, task_info=task_info) + self._tasks[task.id] = task + await self._queue.put(task) + await LogService.info("task_queue", f"Task {name} ({task.id}) enqueued", {"task_id": task.id, "name": name}) + return task + + def get_task(self, task_id: str) -> Task | None: + return self._tasks.get(task_id) + + def get_all_tasks(self) -> list[Task]: + return list(self._tasks.values()) + + async def _execute_task(self, task: Task): + from services.virtual_fs import process_file + + task.status = TaskStatus.RUNNING + await LogService.info("task_queue", f"Task {task.name} ({task.id}) started", {"task_id": task.id, "name": task.name}) + + try: + if task.name == "process_file": + params = task.task_info + result = await process_file( + path=params["path"], + processor_type=params["processor_type"], + config=params["config"], + save_to=params["save_to"] + ) + task.result = result + elif task.name == "automation_task": + from models.database import AutomationTask + from services.processors.registry import get as get_processor + from services.virtual_fs import read_file, write_file + + params = task.task_info + auto_task = await AutomationTask.get(id=params["task_id"]) + path = params["path"] + + processor = get_processor(auto_task.processor_type) + if not processor: + raise ValueError(f"Processor {auto_task.processor_type} not found for task {auto_task.id}") + + file_content = await read_file(path) + result = await processor.process(file_content, path, auto_task.processor_config) + + save_to = auto_task.processor_config.get("save_to") + if save_to and getattr(processor, "produces_file", False): + await write_file(save_to, result) + task.result = "Automation task completed" + else: + raise ValueError(f"Unknown task name: {task.name}") + + task.status = TaskStatus.SUCCESS + await LogService.info("task_queue", f"Task {task.name} ({task.id}) succeeded", {"task_id": task.id, "name": task.name}) + + except Exception as e: + task.status = TaskStatus.FAILED + task.error = str(e) + await LogService.error("task_queue", f"Task {task.name} ({task.id}) failed: {e}", {"task_id": task.id, "name": task.name}) + + async def worker(self): + await LogService.info("task_queue", "Task worker started") + while True: + try: + task = await self._queue.get() + await self._execute_task(task) + except asyncio.CancelledError: + await LogService.info("task_queue", "Task worker stopped") + break + except Exception as e: + await LogService.error("task_queue", f"Error in task worker: {e}", exc_info=True) + finally: + self._queue.task_done() + + def start_worker(self): + if self._worker_task is None or self._worker_task.done(): + self._worker_task = asyncio.create_task(self.worker()) + LogService.info("task_queue", "Task worker created.") + + async def stop_worker(self): + if self._worker_task and not self._worker_task.done(): + self._worker_task.cancel() + try: + await self._worker_task + except asyncio.CancelledError: + pass + finally: + self._worker_task = None + await LogService.info("task_queue", "Task worker has been stopped.") + + +task_queue_service = TaskQueueService() \ No newline at end of file diff --git a/services/tasks.py b/services/tasks.py index 3b7f890..367f114 100644 --- a/services/tasks.py +++ b/services/tasks.py @@ -4,6 +4,9 @@ from models.database import AutomationTask from services.processors.registry import get as get_processor from services.logging import LogService +from services.task_queue import task_queue_service + + class TaskService: async def trigger_tasks(self, event: str, path: str): tasks = await AutomationTask.filter(event=event, enabled=True) @@ -21,28 +24,12 @@ class TaskService: return True async def execute(self, task: AutomationTask, path: str): - from services.virtual_fs import read_file, write_file - - processor = get_processor(task.processor_type) - if not processor: - print(f"Processor {task.processor_type} not found for task {task.id}") - return - - try: - file_content = await read_file(path) - result = await processor.process(file_content, path, task.processor_config) - - save_to = task.processor_config.get("save_to") - if save_to and getattr(processor, "produces_file", False): - await write_file(save_to, result) - - except Exception as e: - error_message = f"Error executing task {task.id} for path {path}: {e}" - print(error_message) - await LogService.error( - source=f"task_executor:{task.id}", - message=error_message, - details={"task_name": task.name, "event": task.event, "path": path, "processor": task.processor_type} - ) + await task_queue_service.add_task( + "automation_task", + { + "task_id": task.id, + "path": path, + }, + ) task_service = TaskService() \ No newline at end of file