feat: Implement task queue service

This commit is contained in:
shiyu
2025-08-31 12:48:20 +08:00
parent b50f19bcb4
commit 50fb0b4977
5 changed files with 167 additions and 27 deletions

View File

@@ -1,7 +1,7 @@
from fastapi import APIRouter, Depends, Body
from typing import Annotated
from services.processors.registry import get_config_schemas
from services.virtual_fs import process_file
from services.task_queue import task_queue_service
from services.auth import get_current_active_user, User
from api.response import success
from pydantic import BaseModel
@@ -21,7 +21,7 @@ async def list_processors(
"name": meta["name"],
"supported_exts": meta.get("supported_exts", []),
"config_schema": meta["config_schema"],
"produces_file": meta.get("produces_file", False),
"produces_file": meta.get("produces_file", False),
})
return success(out)
@@ -40,5 +40,13 @@ async def process_file_with_processor(
req: ProcessRequest = Body(...)
):
save_to = req.path if req.overwrite else req.save_to
result = await process_file(req.path, req.processor_type, req.config, save_to)
return success(result)
task = await task_queue_service.add_task(
"process_file",
{
"path": req.path,
"processor_type": req.processor_type,
"config": req.config,
"save_to": save_to,
},
)
return success({"task_id": task.id})

View File

@@ -6,6 +6,7 @@ from schemas.tasks import AutomationTaskCreate, AutomationTaskUpdate
from api.response import success
from services.auth import get_current_active_user, User
from services.logging import LogService
from services.task_queue import task_queue_service
router = APIRouter(
prefix="/api/tasks",
@@ -15,6 +16,25 @@ router = APIRouter(
)
@router.get("/queue")
async def get_task_queue_status(
current_user: Annotated[User, Depends(get_current_active_user)],
):
tasks = task_queue_service.get_all_tasks()
return success([task.dict() for task in tasks])
@router.get("/queue/{task_id}")
async def get_task_status(
task_id: str,
current_user: Annotated[User, Depends(get_current_active_user)],
):
task = task_queue_service.get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return success(task.dict())
@router.post("/")
async def create_task(
task_in: AutomationTaskCreate,

View File

@@ -8,6 +8,7 @@ from fastapi import FastAPI
from services.middleware.logging_middleware import LoggingMiddleware
from services.middleware.exception_handler import global_exception_handler
from dotenv import load_dotenv
from services.task_queue import task_queue_service
load_dotenv()
@@ -17,9 +18,11 @@ async def lifespan(app: FastAPI):
await init_db()
await runtime_registry.refresh()
await ConfigCenter.set("APP_VERSION", VERSION)
task_queue_service.start_worker()
try:
yield
finally:
await task_queue_service.stop_worker()
await close_db()

122
services/task_queue.py Normal file
View File

@@ -0,0 +1,122 @@
import asyncio
from typing import Dict, Any
from pydantic import BaseModel, Field
import uuid
from services.logging import LogService
from enum import Enum
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
class Task(BaseModel):
id: str = Field(default_factory=lambda: uuid.uuid4().hex)
name: str
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: str | None = None
task_info: Dict[str, Any] = {}
class TaskQueueService:
def __init__(self):
self._queue = asyncio.Queue()
self._tasks: Dict[str, Task] = {}
self._worker_task: asyncio.Task | None = None
async def add_task(self, name: str, task_info: Dict[str, Any]) -> Task:
task = Task(name=name, task_info=task_info)
self._tasks[task.id] = task
await self._queue.put(task)
await LogService.info("task_queue", f"Task {name} ({task.id}) enqueued", {"task_id": task.id, "name": name})
return task
def get_task(self, task_id: str) -> Task | None:
return self._tasks.get(task_id)
def get_all_tasks(self) -> list[Task]:
return list(self._tasks.values())
async def _execute_task(self, task: Task):
from services.virtual_fs import process_file
task.status = TaskStatus.RUNNING
await LogService.info("task_queue", f"Task {task.name} ({task.id}) started", {"task_id": task.id, "name": task.name})
try:
if task.name == "process_file":
params = task.task_info
result = await process_file(
path=params["path"],
processor_type=params["processor_type"],
config=params["config"],
save_to=params["save_to"]
)
task.result = result
elif task.name == "automation_task":
from models.database import AutomationTask
from services.processors.registry import get as get_processor
from services.virtual_fs import read_file, write_file
params = task.task_info
auto_task = await AutomationTask.get(id=params["task_id"])
path = params["path"]
processor = get_processor(auto_task.processor_type)
if not processor:
raise ValueError(f"Processor {auto_task.processor_type} not found for task {auto_task.id}")
file_content = await read_file(path)
result = await processor.process(file_content, path, auto_task.processor_config)
save_to = auto_task.processor_config.get("save_to")
if save_to and getattr(processor, "produces_file", False):
await write_file(save_to, result)
task.result = "Automation task completed"
else:
raise ValueError(f"Unknown task name: {task.name}")
task.status = TaskStatus.SUCCESS
await LogService.info("task_queue", f"Task {task.name} ({task.id}) succeeded", {"task_id": task.id, "name": task.name})
except Exception as e:
task.status = TaskStatus.FAILED
task.error = str(e)
await LogService.error("task_queue", f"Task {task.name} ({task.id}) failed: {e}", {"task_id": task.id, "name": task.name})
async def worker(self):
await LogService.info("task_queue", "Task worker started")
while True:
try:
task = await self._queue.get()
await self._execute_task(task)
except asyncio.CancelledError:
await LogService.info("task_queue", "Task worker stopped")
break
except Exception as e:
await LogService.error("task_queue", f"Error in task worker: {e}", exc_info=True)
finally:
self._queue.task_done()
def start_worker(self):
if self._worker_task is None or self._worker_task.done():
self._worker_task = asyncio.create_task(self.worker())
LogService.info("task_queue", "Task worker created.")
async def stop_worker(self):
if self._worker_task and not self._worker_task.done():
self._worker_task.cancel()
try:
await self._worker_task
except asyncio.CancelledError:
pass
finally:
self._worker_task = None
await LogService.info("task_queue", "Task worker has been stopped.")
task_queue_service = TaskQueueService()

View File

@@ -4,6 +4,9 @@ from models.database import AutomationTask
from services.processors.registry import get as get_processor
from services.logging import LogService
from services.task_queue import task_queue_service
class TaskService:
async def trigger_tasks(self, event: str, path: str):
tasks = await AutomationTask.filter(event=event, enabled=True)
@@ -21,28 +24,12 @@ class TaskService:
return True
async def execute(self, task: AutomationTask, path: str):
from services.virtual_fs import read_file, write_file
processor = get_processor(task.processor_type)
if not processor:
print(f"Processor {task.processor_type} not found for task {task.id}")
return
try:
file_content = await read_file(path)
result = await processor.process(file_content, path, task.processor_config)
save_to = task.processor_config.get("save_to")
if save_to and getattr(processor, "produces_file", False):
await write_file(save_to, result)
except Exception as e:
error_message = f"Error executing task {task.id} for path {path}: {e}"
print(error_message)
await LogService.error(
source=f"task_executor:{task.id}",
message=error_message,
details={"task_name": task.name, "event": task.event, "path": path, "processor": task.processor_type}
)
await task_queue_service.add_task(
"automation_task",
{
"task_id": task.id,
"path": path,
},
)
task_service = TaskService()