From 7d35c10d712014b28ca9189ea71ff1364b4cf054 Mon Sep 17 00:00:00 2001 From: shiyu Date: Mon, 22 Sep 2025 19:08:14 +0800 Subject: [PATCH] feat(task-queue): implement task queue management with settings and UI integration --- api/routes/tasks.py | 39 ++++- schemas/tasks.py | 10 +- services/task_queue.py | 132 ++++++++++---- services/tasks.py | 4 +- web/src/api/tasks.ts | 11 ++ web/src/i18n/locales/en.ts | 19 +- web/src/i18n/locales/zh.ts | 16 ++ web/src/layout/nav.ts | 2 + web/src/pages/TaskQueuePage.tsx | 297 ++++++++++++++++++++++++++++++++ web/src/pages/TasksPage.tsx | 59 +------ web/src/router/LayoutShell.tsx | 2 + 11 files changed, 498 insertions(+), 93 deletions(-) create mode 100644 web/src/pages/TaskQueuePage.tsx diff --git a/api/routes/tasks.py b/api/routes/tasks.py index 4640387..97c0182 100644 --- a/api/routes/tasks.py +++ b/api/routes/tasks.py @@ -2,11 +2,17 @@ from fastapi import APIRouter, Depends, HTTPException from typing import Annotated from models.database import AutomationTask -from schemas.tasks import AutomationTaskCreate, AutomationTaskUpdate +from schemas.tasks import ( + AutomationTaskCreate, + AutomationTaskUpdate, + TaskQueueSettings, + TaskQueueSettingsResponse, +) from api.response import success from services.auth import get_current_active_user, User from services.logging import LogService from services.task_queue import task_queue_service +from services.config import ConfigCenter router = APIRouter( prefix="/api/tasks", @@ -24,6 +30,37 @@ async def get_task_queue_status( return success([task.dict() for task in tasks]) +@router.get("/queue/settings") +async def get_task_queue_settings( + current_user: Annotated[User, Depends(get_current_active_user)], +): + payload = TaskQueueSettingsResponse( + concurrency=task_queue_service.get_concurrency(), + active_workers=task_queue_service.get_active_worker_count(), + ) + return success(payload.model_dump()) + + +@router.post("/queue/settings") +async def update_task_queue_settings( + settings: TaskQueueSettings, + current_user: Annotated[User, Depends(get_current_active_user)], +): + await task_queue_service.set_concurrency(settings.concurrency) + await ConfigCenter.set("TASK_QUEUE_CONCURRENCY", str(task_queue_service.get_concurrency())) + await LogService.action( + "route:tasks", + "Updated task queue settings", + details={"concurrency": settings.concurrency}, + user_id=getattr(current_user, "id", None), + ) + payload = TaskQueueSettingsResponse( + concurrency=task_queue_service.get_concurrency(), + active_workers=task_queue_service.get_active_worker_count(), + ) + return success(payload.model_dump()) + + @router.get("/queue/{task_id}") async def get_task_status( task_id: str, diff --git a/schemas/tasks.py b/schemas/tasks.py index 418c417..b1253bf 100644 --- a/schemas/tasks.py +++ b/schemas/tasks.py @@ -1,4 +1,4 @@ -from pydantic import BaseModel +from pydantic import BaseModel, Field from typing import Optional, Dict, Any @@ -29,3 +29,11 @@ class AutomationTaskRead(AutomationTaskBase): class Config: from_attributes = True + + +class TaskQueueSettings(BaseModel): + concurrency: int = Field(..., ge=1, description="Desired number of concurrent task workers") + + +class TaskQueueSettingsResponse(TaskQueueSettings): + active_workers: int = Field(..., ge=0, description="Currently running worker count") diff --git a/services/task_queue.py b/services/task_queue.py index 3e5f78d..b86d51d 100644 --- a/services/task_queue.py +++ b/services/task_queue.py @@ -32,11 +32,16 @@ class Task(BaseModel): meta: Dict[str, Any] | None = None +_SENTINEL = object() + + class TaskQueueService: def __init__(self): - self._queue = asyncio.Queue() + self._queue: asyncio.Queue[Task | object] = asyncio.Queue() self._tasks: Dict[str, Task] = {} - self._worker_task: asyncio.Task | None = None + self._worker_tasks: list[asyncio.Task] = [] + self._concurrency: int = 1 + self._worker_seq: int = 0 async def add_task(self, name: str, task_info: Dict[str, Any]) -> Task: task = Task(name=name, task_info=task_info) @@ -83,7 +88,7 @@ class TaskQueueService: overwrite=params.get("overwrite", False), ) task.result = result - elif task.name == "automation_task": + elif task.name == "automation_task" or self._is_processor_task(task.name): from models.database import AutomationTask from services.processors.registry import get as get_processor from services.virtual_fs import read_file, write_file @@ -92,9 +97,21 @@ class TaskQueueService: auto_task = await AutomationTask.get(id=params["task_id"]) path = params["path"] - processor = get_processor(auto_task.processor_type) + processor_type = auto_task.processor_type if task.name == "automation_task" else task.name + processor = get_processor(processor_type) if not processor: - raise ValueError(f"Processor {auto_task.processor_type} not found for task {auto_task.id}") + raise ValueError(f"Processor {processor_type} not found for task {auto_task.id}") + + if processor_type != auto_task.processor_type: + await LogService.warning( + "task_queue", + "Processor type mismatch; falling back to stored type", + {"task_id": auto_task.id, "expected": auto_task.processor_type, "got": processor_type}, + ) + processor_type = auto_task.processor_type + processor = get_processor(processor_type) + if not processor: + raise ValueError(f"Processor {processor_type} not found for task {auto_task.id}") file_content = await read_file(path) result = await processor.process(file_content, path, auto_task.processor_config) @@ -124,35 +141,88 @@ class TaskQueueService: task.error = str(e) await LogService.error("task_queue", f"Task {task.name} ({task.id}) failed: {e}", {"task_id": task.id, "name": task.name}) - async def worker(self): - await LogService.info("task_queue", "Task worker started") - while True: - try: - task = await self._queue.get() - await self._execute_task(task) - except asyncio.CancelledError: - await LogService.info("task_queue", "Task worker stopped") - break - except Exception as e: - await LogService.error("task_queue", f"Error in task worker: {e}", exc_info=True) - finally: - self._queue.task_done() + def _cleanup_workers(self): + self._worker_tasks = [task for task in self._worker_tasks if not task.done()] - async def start_worker(self): - if self._worker_task is None or self._worker_task.done(): - self._worker_task = asyncio.create_task(self.worker()) - await LogService.info("task_queue", "Task worker created.") + def _is_processor_task(self, task_name: str) -> bool: + try: + from services.processors.registry import get as get_processor + + return get_processor(task_name) is not None + except Exception: + return False + + async def _ensure_worker_count(self): + self._cleanup_workers() + current = len(self._worker_tasks) + if current < self._concurrency: + for _ in range(self._concurrency - current): + self._worker_seq += 1 + worker_id = self._worker_seq + worker_task = asyncio.create_task(self._worker_loop(worker_id)) + self._worker_tasks.append(worker_task) + await LogService.info("task_queue", "Task workers adjusted", {"active_workers": len(self._worker_tasks), "target": self._concurrency}) + elif current > self._concurrency: + for _ in range(current - self._concurrency): + await self._queue.put(_SENTINEL) + await LogService.info("task_queue", "Task workers scaling down", {"active_workers": len(self._worker_tasks), "target": self._concurrency}) + + async def _worker_loop(self, worker_id: int): + current_task = asyncio.current_task() + await LogService.info("task_queue", f"Worker {worker_id} started") + try: + while True: + job = await self._queue.get() + if job is _SENTINEL: + self._queue.task_done() + break + try: + await self._execute_task(job) + except Exception as e: + await LogService.error( + "task_queue", + f"Error executing task {job.id}: {e}", + {"task_id": job.id, "name": job.name}, + ) + finally: + self._queue.task_done() + finally: + if current_task in self._worker_tasks: + self._worker_tasks.remove(current_task) # type: ignore[arg-type] + await LogService.info("task_queue", f"Worker {worker_id} stopped") + + async def start_worker(self, concurrency: int | None = None): + if concurrency is None: + from services.config import ConfigCenter + + stored_value = await ConfigCenter.get("TASK_QUEUE_CONCURRENCY", self._concurrency) + try: + concurrency = int(stored_value) + except (TypeError, ValueError): + concurrency = self._concurrency + await self.set_concurrency(concurrency) + + async def set_concurrency(self, value: int): + value = max(1, int(value)) + if value != self._concurrency: + self._concurrency = value + await self._ensure_worker_count() async def stop_worker(self): - if self._worker_task and not self._worker_task.done(): - self._worker_task.cancel() - try: - await self._worker_task - except asyncio.CancelledError: - pass - finally: - self._worker_task = None - await LogService.info("task_queue", "Task worker has been stopped.") + self._cleanup_workers() + for _ in range(len(self._worker_tasks)): + await self._queue.put(_SENTINEL) + if self._worker_tasks: + await asyncio.gather(*self._worker_tasks, return_exceptions=True) + self._worker_tasks.clear() + await LogService.info("task_queue", "Task workers have been stopped.") + + def get_concurrency(self) -> int: + return self._concurrency + + def get_active_worker_count(self) -> int: + self._cleanup_workers() + return len(self._worker_tasks) task_queue_service = TaskQueueService() diff --git a/services/tasks.py b/services/tasks.py index 367f114..e7138fd 100644 --- a/services/tasks.py +++ b/services/tasks.py @@ -25,11 +25,11 @@ class TaskService: async def execute(self, task: AutomationTask, path: str): await task_queue_service.add_task( - "automation_task", + task.processor_type, { "task_id": task.id, "path": path, }, ) -task_service = TaskService() \ No newline at end of file +task_service = TaskService() diff --git a/web/src/api/tasks.ts b/web/src/api/tasks.ts index 9137eea..e04f58f 100644 --- a/web/src/api/tasks.ts +++ b/web/src/api/tasks.ts @@ -26,10 +26,21 @@ export interface QueuedTask { meta?: Record | null; } +export interface TaskQueueSettings { + concurrency: number; + active_workers: number; +} + +export interface TaskQueueSettingsUpdate { + concurrency: number; +} + export const tasksApi = { list: () => request('/tasks/'), create: (payload: AutomationTaskCreate) => request('/tasks/', { method: 'POST', json: payload }), update: (id: number, payload: AutomationTaskUpdate) => request(`/tasks/${id}`, { method: 'PUT', json: payload }), remove: (id: number) => request(`/tasks/${id}`, { method: 'DELETE' }), getQueue: () => request('/tasks/queue'), + getQueueSettings: () => request('/tasks/queue/settings'), + updateQueueSettings: (payload: TaskQueueSettingsUpdate) => request('/tasks/queue/settings', { method: 'POST', json: payload }), }; diff --git a/web/src/i18n/locales/en.ts b/web/src/i18n/locales/en.ts index 96e5cf4..e8aca2c 100644 --- a/web/src/i18n/locales/en.ts +++ b/web/src/i18n/locales/en.ts @@ -179,6 +179,24 @@ export const en = { 'Copy Markdown': 'Copy Markdown', 'Close': 'Close', + // Task queue + 'Task Queue': 'Task Queue', + 'Last updated at {time}': 'Last updated at {time}', + 'Total Tasks': 'Total Tasks', + 'Running Tasks': 'Running Tasks', + 'Waiting Tasks': 'Waiting Tasks', + 'Failed Tasks': 'Failed Tasks', + 'Active Workers': 'Active Workers', + 'Task Type': 'Task Type', + 'Search by name or ID': 'Search by name or ID', + 'Filter by status': 'Filter by status', + 'Queue Concurrency': 'Queue Concurrency', + 'Settings saved': 'Settings saved', + 'Expand': 'Expand', + 'Adjust worker concurrency immediately': 'Adjust worker concurrency immediately', + 'Auto': 'Auto', + 'Manual': 'Manual', + // File detail 'Camera Make': 'Camera Make', 'Camera Model': 'Camera Model', @@ -307,7 +325,6 @@ export const en = { // Tasks 'Automation Tasks': 'Automation Tasks', - 'Running Tasks': 'Running Tasks', 'Create Task': 'Create Task', 'Edit Task': 'Edit Task', 'Create Automation Task': 'Create Automation Task', diff --git a/web/src/i18n/locales/zh.ts b/web/src/i18n/locales/zh.ts index 35df4ef..6900d1b 100644 --- a/web/src/i18n/locales/zh.ts +++ b/web/src/i18n/locales/zh.ts @@ -181,6 +181,22 @@ export const zh = { 'Copy Markdown': '复制 Markdown', 'Close': '关闭', + // Task queue + 'Task Queue': '任务队列', + 'Last updated at {time}': '上次刷新时间 {time}', + 'Total Tasks': '任务总数', + 'Waiting Tasks': '等待中的任务', + 'Failed Tasks': '失败的任务', + 'Active Workers': '活跃 Worker 数', + 'Task Type': '任务类型', + 'Search by name or ID': '按名称或 ID 搜索', + 'Filter by status': '按状态筛选', + 'Queue Concurrency': '队列并发数', + 'Settings saved': '设置已保存', + 'Expand': '展开', + 'Adjust worker concurrency immediately': '立即调整任务并发数', + 'Auto': '自动', + 'Manual': '手动', // File detail 'Camera Make': '设备品牌', 'Camera Model': '设备型号', diff --git a/web/src/layout/nav.ts b/web/src/layout/nav.ts index f7d0c3f..ee84dcd 100644 --- a/web/src/layout/nav.ts +++ b/web/src/layout/nav.ts @@ -10,6 +10,7 @@ import { DatabaseOutlined, AppstoreOutlined, CodeOutlined, + ClockCircleOutlined, } from '@ant-design/icons'; import type { ReactNode } from 'react'; @@ -30,6 +31,7 @@ export const navGroups: NavGroup[] = [ children: [ { key: 'processors', icon: React.createElement(CodeOutlined), label: 'Processors' }, { key: 'tasks', icon: React.createElement(RobotOutlined), label: 'Automation' }, + { key: 'task-queue', icon: React.createElement(ClockCircleOutlined), label: 'Task Queue' }, { key: 'share', icon: React.createElement(ShareAltOutlined), label: 'My Shares' }, { key: 'offline', icon: React.createElement(CloudDownloadOutlined), label: 'Offline Downloads' }, { key: 'adapters', icon: React.createElement(ApiOutlined), label: 'Adapters' }, diff --git a/web/src/pages/TaskQueuePage.tsx b/web/src/pages/TaskQueuePage.tsx new file mode 100644 index 0000000..7176cbd --- /dev/null +++ b/web/src/pages/TaskQueuePage.tsx @@ -0,0 +1,297 @@ +import { memo, useCallback, useEffect, useMemo, useState } from 'react'; +import { Button, Card, Col, Form, Input, InputNumber, message, Progress, Row, Segmented, Select, Space, Table, Tag, Tooltip, Typography } from 'antd'; +import type { ColumnsType } from 'antd/es/table'; +import PageCard from '../components/PageCard'; +import { tasksApi, type QueuedTask, type TaskQueueSettings } from '../api/tasks'; +import { useI18n } from '../i18n'; +import { SearchOutlined } from '@ant-design/icons'; + +type QueueStatus = QueuedTask['status']; + +const AUTO_REFRESH_INTERVAL = 5000; + +const TaskQueuePage = memo(function TaskQueuePage() { + const { t } = useI18n(); + const [messageApi, contextHolder] = message.useMessage(); + const [tasks, setTasks] = useState([]); + const [loading, setLoading] = useState(false); + const [keyword, setKeyword] = useState(''); + const [statusFilter, setStatusFilter] = useState(['pending', 'running']); + const [autoRefresh, setAutoRefresh] = useState<'on' | 'off'>('on'); + const [settings, setSettings] = useState({ concurrency: 1, active_workers: 0 }); + const [concurrencyDraft, setConcurrencyDraft] = useState(1); + const [settingsLoading, setSettingsLoading] = useState(false); + const [lastUpdated, setLastUpdated] = useState(null); + + const statusOptions = useMemo(() => ([ + { label: t('Pending'), value: 'pending' }, + { label: t('Running'), value: 'running' }, + { label: t('Success'), value: 'success' }, + { label: t('Failed'), value: 'failed' }, + ]), [t]); + + const loadTasks = useCallback(async (withSpinner = false) => { + if (withSpinner) setLoading(true); + try { + const data = await tasksApi.getQueue(); + setTasks(data); + setLastUpdated(Date.now()); + } catch (err) { + const msg = err instanceof Error && err.message ? err.message : t('Load failed'); + messageApi.error(msg); + } finally { + if (withSpinner) setLoading(false); + } + }, [messageApi, t]); + + const loadSettings = useCallback(async () => { + try { + const data = await tasksApi.getQueueSettings(); + setSettings(data); + setConcurrencyDraft(data.concurrency); + } catch (err) { + const msg = err instanceof Error && err.message ? err.message : t('Load failed'); + messageApi.error(msg); + } + }, [messageApi, t]); + + useEffect(() => { + loadTasks(true).catch(() => {}); + loadSettings().catch(() => {}); + }, [loadTasks, loadSettings]); + + useEffect(() => { + if (autoRefresh === 'off') return () => {}; + const timer = window.setInterval(() => { + loadTasks().catch(() => {}); + }, AUTO_REFRESH_INTERVAL); + return () => window.clearInterval(timer); + }, [autoRefresh, loadTasks]); + + const metrics = useMemo(() => { + const counts = { + total: tasks.length, + pending: 0, + running: 0, + success: 0, + failed: 0, + }; + tasks.forEach(task => { + counts[task.status] += 1; + }); + return counts; + }, [tasks]); + + const filteredTasks = useMemo(() => { + const normalizedKeyword = keyword.trim().toLowerCase(); + return tasks.filter(task => { + if (statusFilter.length > 0 && !statusFilter.includes(task.status)) return false; + if (!normalizedKeyword) return true; + const haystack = [ + task.id, + task.name, + JSON.stringify(task.task_info ?? {}), + task.meta ? JSON.stringify(task.meta) : '', + task.error ?? '', + ].join(' ').toLowerCase(); + return haystack.includes(normalizedKeyword); + }); + }, [keyword, statusFilter, tasks]); + + const progressLabel = useCallback((task: QueuedTask) => { + const percent = task.progress?.percent; + const stage = task.progress?.stage; + if (percent !== undefined && percent !== null) { + const percentText = `${percent.toFixed(1)}%`; + return stage ? `${percentText} · ${stage}` : percentText; + } + return stage ?? '--'; + }, []); + + const columns: ColumnsType = useMemo(() => ([ + { + title: 'ID', + dataIndex: 'id', + width: 160, + render: (id: string) => ( + + + + {id.slice(0, 8)} + + + + ), + }, + { + title: t('Task Type'), + dataIndex: 'name', + sorter: (a, b) => a.name.localeCompare(b.name), + width: 160, + }, + { + title: t('Status'), + dataIndex: 'status', + width: 120, + render: (status: QueueStatus) => { + const colorMap: Record = { + pending: 'default', + running: 'processing', + success: 'success', + failed: 'error', + }; + const labelMap: Record = { + pending: t('Pending'), + running: t('Running'), + success: t('Success'), + failed: t('Failed'), + }; + return {labelMap[status]}; + }, + }, + { + title: t('Progress'), + render: (_: unknown, record) => ( + record.progress?.percent !== undefined && record.progress.percent !== null + ? + : {progressLabel(record)} + ), + }, + { + title: t('Details'), + dataIndex: 'task_info', + render: (_: unknown, record) => { + const info = record.task_info ? JSON.stringify(record.task_info) : '--'; + return ( + + {info} + + ); + }, + }, + { + title: t('Error'), + dataIndex: 'error', + render: (error: string | undefined) => error ? {error} : '--', + }, + ]), [progressLabel, t]); + + const handleSaveSettings = useCallback(async () => { + setSettingsLoading(true); + try { + const next = await tasksApi.updateQueueSettings({ concurrency: concurrencyDraft }); + setSettings(next); + setConcurrencyDraft(next.concurrency); + messageApi.success(t('Settings saved')); + await loadTasks(); + } catch (err) { + const msg = err instanceof Error && err.message ? err.message : t('Operation failed'); + messageApi.error(msg); + } finally { + setSettingsLoading(false); + } + }, [concurrencyDraft, loadTasks, messageApi, t]); + + const lastUpdatedText = useMemo(() => { + if (!lastUpdated) return '--'; + return new Date(lastUpdated).toLocaleTimeString(); + }, [lastUpdated]); + + return ( + <> + {contextHolder} + + {t('Last updated at {time}', { time: lastUpdatedText })} + setAutoRefresh(value as 'on' | 'off')} + /> + + + } + > + + {[{ + label: t('Total Tasks'), value: metrics.total, color: '#1677ff' + }, { + label: t('Running Tasks'), value: metrics.running, color: '#52c41a' + }, { + label: t('Waiting Tasks'), value: metrics.pending, color: '#faad14' + }, { + label: t('Failed Tasks'), value: metrics.failed, color: '#ff4d4f' + }, { + label: t('Active Workers'), value: settings.active_workers, color: '#722ed1' + }].map((item) => ( + + + {item.label} + {item.value} + + + ))} + + +
+ + } + onChange={(e) => setKeyword(e.target.value)} + style={{ width: 260 }} + /> + + +