mirror of
https://github.com/DrizzleTime/Foxel.git
synced 2026-06-25 17:23:59 +08:00
feat(task-queue): implement task queue management with settings and UI integration
This commit is contained in:
@@ -2,11 +2,17 @@ from fastapi import APIRouter, Depends, HTTPException
|
||||
from typing import Annotated
|
||||
|
||||
from models.database import AutomationTask
|
||||
from schemas.tasks import AutomationTaskCreate, AutomationTaskUpdate
|
||||
from schemas.tasks import (
|
||||
AutomationTaskCreate,
|
||||
AutomationTaskUpdate,
|
||||
TaskQueueSettings,
|
||||
TaskQueueSettingsResponse,
|
||||
)
|
||||
from api.response import success
|
||||
from services.auth import get_current_active_user, User
|
||||
from services.logging import LogService
|
||||
from services.task_queue import task_queue_service
|
||||
from services.config import ConfigCenter
|
||||
|
||||
router = APIRouter(
|
||||
prefix="/api/tasks",
|
||||
@@ -24,6 +30,37 @@ async def get_task_queue_status(
|
||||
return success([task.dict() for task in tasks])
|
||||
|
||||
|
||||
@router.get("/queue/settings")
|
||||
async def get_task_queue_settings(
|
||||
current_user: Annotated[User, Depends(get_current_active_user)],
|
||||
):
|
||||
payload = TaskQueueSettingsResponse(
|
||||
concurrency=task_queue_service.get_concurrency(),
|
||||
active_workers=task_queue_service.get_active_worker_count(),
|
||||
)
|
||||
return success(payload.model_dump())
|
||||
|
||||
|
||||
@router.post("/queue/settings")
|
||||
async def update_task_queue_settings(
|
||||
settings: TaskQueueSettings,
|
||||
current_user: Annotated[User, Depends(get_current_active_user)],
|
||||
):
|
||||
await task_queue_service.set_concurrency(settings.concurrency)
|
||||
await ConfigCenter.set("TASK_QUEUE_CONCURRENCY", str(task_queue_service.get_concurrency()))
|
||||
await LogService.action(
|
||||
"route:tasks",
|
||||
"Updated task queue settings",
|
||||
details={"concurrency": settings.concurrency},
|
||||
user_id=getattr(current_user, "id", None),
|
||||
)
|
||||
payload = TaskQueueSettingsResponse(
|
||||
concurrency=task_queue_service.get_concurrency(),
|
||||
active_workers=task_queue_service.get_active_worker_count(),
|
||||
)
|
||||
return success(payload.model_dump())
|
||||
|
||||
|
||||
@router.get("/queue/{task_id}")
|
||||
async def get_task_status(
|
||||
task_id: str,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
from typing import Optional, Dict, Any
|
||||
|
||||
|
||||
@@ -29,3 +29,11 @@ class AutomationTaskRead(AutomationTaskBase):
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class TaskQueueSettings(BaseModel):
|
||||
concurrency: int = Field(..., ge=1, description="Desired number of concurrent task workers")
|
||||
|
||||
|
||||
class TaskQueueSettingsResponse(TaskQueueSettings):
|
||||
active_workers: int = Field(..., ge=0, description="Currently running worker count")
|
||||
|
||||
@@ -32,11 +32,16 @@ class Task(BaseModel):
|
||||
meta: Dict[str, Any] | None = None
|
||||
|
||||
|
||||
_SENTINEL = object()
|
||||
|
||||
|
||||
class TaskQueueService:
|
||||
def __init__(self):
|
||||
self._queue = asyncio.Queue()
|
||||
self._queue: asyncio.Queue[Task | object] = asyncio.Queue()
|
||||
self._tasks: Dict[str, Task] = {}
|
||||
self._worker_task: asyncio.Task | None = None
|
||||
self._worker_tasks: list[asyncio.Task] = []
|
||||
self._concurrency: int = 1
|
||||
self._worker_seq: int = 0
|
||||
|
||||
async def add_task(self, name: str, task_info: Dict[str, Any]) -> Task:
|
||||
task = Task(name=name, task_info=task_info)
|
||||
@@ -83,7 +88,7 @@ class TaskQueueService:
|
||||
overwrite=params.get("overwrite", False),
|
||||
)
|
||||
task.result = result
|
||||
elif task.name == "automation_task":
|
||||
elif task.name == "automation_task" or self._is_processor_task(task.name):
|
||||
from models.database import AutomationTask
|
||||
from services.processors.registry import get as get_processor
|
||||
from services.virtual_fs import read_file, write_file
|
||||
@@ -92,9 +97,21 @@ class TaskQueueService:
|
||||
auto_task = await AutomationTask.get(id=params["task_id"])
|
||||
path = params["path"]
|
||||
|
||||
processor = get_processor(auto_task.processor_type)
|
||||
processor_type = auto_task.processor_type if task.name == "automation_task" else task.name
|
||||
processor = get_processor(processor_type)
|
||||
if not processor:
|
||||
raise ValueError(f"Processor {auto_task.processor_type} not found for task {auto_task.id}")
|
||||
raise ValueError(f"Processor {processor_type} not found for task {auto_task.id}")
|
||||
|
||||
if processor_type != auto_task.processor_type:
|
||||
await LogService.warning(
|
||||
"task_queue",
|
||||
"Processor type mismatch; falling back to stored type",
|
||||
{"task_id": auto_task.id, "expected": auto_task.processor_type, "got": processor_type},
|
||||
)
|
||||
processor_type = auto_task.processor_type
|
||||
processor = get_processor(processor_type)
|
||||
if not processor:
|
||||
raise ValueError(f"Processor {processor_type} not found for task {auto_task.id}")
|
||||
|
||||
file_content = await read_file(path)
|
||||
result = await processor.process(file_content, path, auto_task.processor_config)
|
||||
@@ -124,35 +141,88 @@ class TaskQueueService:
|
||||
task.error = str(e)
|
||||
await LogService.error("task_queue", f"Task {task.name} ({task.id}) failed: {e}", {"task_id": task.id, "name": task.name})
|
||||
|
||||
async def worker(self):
|
||||
await LogService.info("task_queue", "Task worker started")
|
||||
while True:
|
||||
try:
|
||||
task = await self._queue.get()
|
||||
await self._execute_task(task)
|
||||
except asyncio.CancelledError:
|
||||
await LogService.info("task_queue", "Task worker stopped")
|
||||
break
|
||||
except Exception as e:
|
||||
await LogService.error("task_queue", f"Error in task worker: {e}", exc_info=True)
|
||||
finally:
|
||||
self._queue.task_done()
|
||||
def _cleanup_workers(self):
|
||||
self._worker_tasks = [task for task in self._worker_tasks if not task.done()]
|
||||
|
||||
async def start_worker(self):
|
||||
if self._worker_task is None or self._worker_task.done():
|
||||
self._worker_task = asyncio.create_task(self.worker())
|
||||
await LogService.info("task_queue", "Task worker created.")
|
||||
def _is_processor_task(self, task_name: str) -> bool:
|
||||
try:
|
||||
from services.processors.registry import get as get_processor
|
||||
|
||||
return get_processor(task_name) is not None
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
async def _ensure_worker_count(self):
|
||||
self._cleanup_workers()
|
||||
current = len(self._worker_tasks)
|
||||
if current < self._concurrency:
|
||||
for _ in range(self._concurrency - current):
|
||||
self._worker_seq += 1
|
||||
worker_id = self._worker_seq
|
||||
worker_task = asyncio.create_task(self._worker_loop(worker_id))
|
||||
self._worker_tasks.append(worker_task)
|
||||
await LogService.info("task_queue", "Task workers adjusted", {"active_workers": len(self._worker_tasks), "target": self._concurrency})
|
||||
elif current > self._concurrency:
|
||||
for _ in range(current - self._concurrency):
|
||||
await self._queue.put(_SENTINEL)
|
||||
await LogService.info("task_queue", "Task workers scaling down", {"active_workers": len(self._worker_tasks), "target": self._concurrency})
|
||||
|
||||
async def _worker_loop(self, worker_id: int):
|
||||
current_task = asyncio.current_task()
|
||||
await LogService.info("task_queue", f"Worker {worker_id} started")
|
||||
try:
|
||||
while True:
|
||||
job = await self._queue.get()
|
||||
if job is _SENTINEL:
|
||||
self._queue.task_done()
|
||||
break
|
||||
try:
|
||||
await self._execute_task(job)
|
||||
except Exception as e:
|
||||
await LogService.error(
|
||||
"task_queue",
|
||||
f"Error executing task {job.id}: {e}",
|
||||
{"task_id": job.id, "name": job.name},
|
||||
)
|
||||
finally:
|
||||
self._queue.task_done()
|
||||
finally:
|
||||
if current_task in self._worker_tasks:
|
||||
self._worker_tasks.remove(current_task) # type: ignore[arg-type]
|
||||
await LogService.info("task_queue", f"Worker {worker_id} stopped")
|
||||
|
||||
async def start_worker(self, concurrency: int | None = None):
|
||||
if concurrency is None:
|
||||
from services.config import ConfigCenter
|
||||
|
||||
stored_value = await ConfigCenter.get("TASK_QUEUE_CONCURRENCY", self._concurrency)
|
||||
try:
|
||||
concurrency = int(stored_value)
|
||||
except (TypeError, ValueError):
|
||||
concurrency = self._concurrency
|
||||
await self.set_concurrency(concurrency)
|
||||
|
||||
async def set_concurrency(self, value: int):
|
||||
value = max(1, int(value))
|
||||
if value != self._concurrency:
|
||||
self._concurrency = value
|
||||
await self._ensure_worker_count()
|
||||
|
||||
async def stop_worker(self):
|
||||
if self._worker_task and not self._worker_task.done():
|
||||
self._worker_task.cancel()
|
||||
try:
|
||||
await self._worker_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
self._worker_task = None
|
||||
await LogService.info("task_queue", "Task worker has been stopped.")
|
||||
self._cleanup_workers()
|
||||
for _ in range(len(self._worker_tasks)):
|
||||
await self._queue.put(_SENTINEL)
|
||||
if self._worker_tasks:
|
||||
await asyncio.gather(*self._worker_tasks, return_exceptions=True)
|
||||
self._worker_tasks.clear()
|
||||
await LogService.info("task_queue", "Task workers have been stopped.")
|
||||
|
||||
def get_concurrency(self) -> int:
|
||||
return self._concurrency
|
||||
|
||||
def get_active_worker_count(self) -> int:
|
||||
self._cleanup_workers()
|
||||
return len(self._worker_tasks)
|
||||
|
||||
|
||||
task_queue_service = TaskQueueService()
|
||||
|
||||
@@ -25,11 +25,11 @@ class TaskService:
|
||||
|
||||
async def execute(self, task: AutomationTask, path: str):
|
||||
await task_queue_service.add_task(
|
||||
"automation_task",
|
||||
task.processor_type,
|
||||
{
|
||||
"task_id": task.id,
|
||||
"path": path,
|
||||
},
|
||||
)
|
||||
|
||||
task_service = TaskService()
|
||||
task_service = TaskService()
|
||||
|
||||
@@ -26,10 +26,21 @@ export interface QueuedTask {
|
||||
meta?: Record<string, any> | null;
|
||||
}
|
||||
|
||||
export interface TaskQueueSettings {
|
||||
concurrency: number;
|
||||
active_workers: number;
|
||||
}
|
||||
|
||||
export interface TaskQueueSettingsUpdate {
|
||||
concurrency: number;
|
||||
}
|
||||
|
||||
export const tasksApi = {
|
||||
list: () => request<AutomationTask[]>('/tasks/'),
|
||||
create: (payload: AutomationTaskCreate) => request<AutomationTask>('/tasks/', { method: 'POST', json: payload }),
|
||||
update: (id: number, payload: AutomationTaskUpdate) => request<AutomationTask>(`/tasks/${id}`, { method: 'PUT', json: payload }),
|
||||
remove: (id: number) => request<void>(`/tasks/${id}`, { method: 'DELETE' }),
|
||||
getQueue: () => request<QueuedTask[]>('/tasks/queue'),
|
||||
getQueueSettings: () => request<TaskQueueSettings>('/tasks/queue/settings'),
|
||||
updateQueueSettings: (payload: TaskQueueSettingsUpdate) => request<TaskQueueSettings>('/tasks/queue/settings', { method: 'POST', json: payload }),
|
||||
};
|
||||
|
||||
@@ -179,6 +179,24 @@ export const en = {
|
||||
'Copy Markdown': 'Copy Markdown',
|
||||
'Close': 'Close',
|
||||
|
||||
// Task queue
|
||||
'Task Queue': 'Task Queue',
|
||||
'Last updated at {time}': 'Last updated at {time}',
|
||||
'Total Tasks': 'Total Tasks',
|
||||
'Running Tasks': 'Running Tasks',
|
||||
'Waiting Tasks': 'Waiting Tasks',
|
||||
'Failed Tasks': 'Failed Tasks',
|
||||
'Active Workers': 'Active Workers',
|
||||
'Task Type': 'Task Type',
|
||||
'Search by name or ID': 'Search by name or ID',
|
||||
'Filter by status': 'Filter by status',
|
||||
'Queue Concurrency': 'Queue Concurrency',
|
||||
'Settings saved': 'Settings saved',
|
||||
'Expand': 'Expand',
|
||||
'Adjust worker concurrency immediately': 'Adjust worker concurrency immediately',
|
||||
'Auto': 'Auto',
|
||||
'Manual': 'Manual',
|
||||
|
||||
// File detail
|
||||
'Camera Make': 'Camera Make',
|
||||
'Camera Model': 'Camera Model',
|
||||
@@ -307,7 +325,6 @@ export const en = {
|
||||
|
||||
// Tasks
|
||||
'Automation Tasks': 'Automation Tasks',
|
||||
'Running Tasks': 'Running Tasks',
|
||||
'Create Task': 'Create Task',
|
||||
'Edit Task': 'Edit Task',
|
||||
'Create Automation Task': 'Create Automation Task',
|
||||
|
||||
@@ -181,6 +181,22 @@ export const zh = {
|
||||
'Copy Markdown': '复制 Markdown',
|
||||
'Close': '关闭',
|
||||
|
||||
// Task queue
|
||||
'Task Queue': '任务队列',
|
||||
'Last updated at {time}': '上次刷新时间 {time}',
|
||||
'Total Tasks': '任务总数',
|
||||
'Waiting Tasks': '等待中的任务',
|
||||
'Failed Tasks': '失败的任务',
|
||||
'Active Workers': '活跃 Worker 数',
|
||||
'Task Type': '任务类型',
|
||||
'Search by name or ID': '按名称或 ID 搜索',
|
||||
'Filter by status': '按状态筛选',
|
||||
'Queue Concurrency': '队列并发数',
|
||||
'Settings saved': '设置已保存',
|
||||
'Expand': '展开',
|
||||
'Adjust worker concurrency immediately': '立即调整任务并发数',
|
||||
'Auto': '自动',
|
||||
'Manual': '手动',
|
||||
// File detail
|
||||
'Camera Make': '设备品牌',
|
||||
'Camera Model': '设备型号',
|
||||
|
||||
@@ -10,6 +10,7 @@ import {
|
||||
DatabaseOutlined,
|
||||
AppstoreOutlined,
|
||||
CodeOutlined,
|
||||
ClockCircleOutlined,
|
||||
} from '@ant-design/icons';
|
||||
import type { ReactNode } from 'react';
|
||||
|
||||
@@ -30,6 +31,7 @@ export const navGroups: NavGroup[] = [
|
||||
children: [
|
||||
{ key: 'processors', icon: React.createElement(CodeOutlined), label: 'Processors' },
|
||||
{ key: 'tasks', icon: React.createElement(RobotOutlined), label: 'Automation' },
|
||||
{ key: 'task-queue', icon: React.createElement(ClockCircleOutlined), label: 'Task Queue' },
|
||||
{ key: 'share', icon: React.createElement(ShareAltOutlined), label: 'My Shares' },
|
||||
{ key: 'offline', icon: React.createElement(CloudDownloadOutlined), label: 'Offline Downloads' },
|
||||
{ key: 'adapters', icon: React.createElement(ApiOutlined), label: 'Adapters' },
|
||||
|
||||
297
web/src/pages/TaskQueuePage.tsx
Normal file
297
web/src/pages/TaskQueuePage.tsx
Normal file
@@ -0,0 +1,297 @@
|
||||
import { memo, useCallback, useEffect, useMemo, useState } from 'react';
|
||||
import { Button, Card, Col, Form, Input, InputNumber, message, Progress, Row, Segmented, Select, Space, Table, Tag, Tooltip, Typography } from 'antd';
|
||||
import type { ColumnsType } from 'antd/es/table';
|
||||
import PageCard from '../components/PageCard';
|
||||
import { tasksApi, type QueuedTask, type TaskQueueSettings } from '../api/tasks';
|
||||
import { useI18n } from '../i18n';
|
||||
import { SearchOutlined } from '@ant-design/icons';
|
||||
|
||||
type QueueStatus = QueuedTask['status'];
|
||||
|
||||
const AUTO_REFRESH_INTERVAL = 5000;
|
||||
|
||||
const TaskQueuePage = memo(function TaskQueuePage() {
|
||||
const { t } = useI18n();
|
||||
const [messageApi, contextHolder] = message.useMessage();
|
||||
const [tasks, setTasks] = useState<QueuedTask[]>([]);
|
||||
const [loading, setLoading] = useState(false);
|
||||
const [keyword, setKeyword] = useState('');
|
||||
const [statusFilter, setStatusFilter] = useState<QueueStatus[]>(['pending', 'running']);
|
||||
const [autoRefresh, setAutoRefresh] = useState<'on' | 'off'>('on');
|
||||
const [settings, setSettings] = useState<TaskQueueSettings>({ concurrency: 1, active_workers: 0 });
|
||||
const [concurrencyDraft, setConcurrencyDraft] = useState<number>(1);
|
||||
const [settingsLoading, setSettingsLoading] = useState(false);
|
||||
const [lastUpdated, setLastUpdated] = useState<number | null>(null);
|
||||
|
||||
const statusOptions = useMemo(() => ([
|
||||
{ label: t('Pending'), value: 'pending' },
|
||||
{ label: t('Running'), value: 'running' },
|
||||
{ label: t('Success'), value: 'success' },
|
||||
{ label: t('Failed'), value: 'failed' },
|
||||
]), [t]);
|
||||
|
||||
const loadTasks = useCallback(async (withSpinner = false) => {
|
||||
if (withSpinner) setLoading(true);
|
||||
try {
|
||||
const data = await tasksApi.getQueue();
|
||||
setTasks(data);
|
||||
setLastUpdated(Date.now());
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error && err.message ? err.message : t('Load failed');
|
||||
messageApi.error(msg);
|
||||
} finally {
|
||||
if (withSpinner) setLoading(false);
|
||||
}
|
||||
}, [messageApi, t]);
|
||||
|
||||
const loadSettings = useCallback(async () => {
|
||||
try {
|
||||
const data = await tasksApi.getQueueSettings();
|
||||
setSettings(data);
|
||||
setConcurrencyDraft(data.concurrency);
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error && err.message ? err.message : t('Load failed');
|
||||
messageApi.error(msg);
|
||||
}
|
||||
}, [messageApi, t]);
|
||||
|
||||
useEffect(() => {
|
||||
loadTasks(true).catch(() => {});
|
||||
loadSettings().catch(() => {});
|
||||
}, [loadTasks, loadSettings]);
|
||||
|
||||
useEffect(() => {
|
||||
if (autoRefresh === 'off') return () => {};
|
||||
const timer = window.setInterval(() => {
|
||||
loadTasks().catch(() => {});
|
||||
}, AUTO_REFRESH_INTERVAL);
|
||||
return () => window.clearInterval(timer);
|
||||
}, [autoRefresh, loadTasks]);
|
||||
|
||||
const metrics = useMemo(() => {
|
||||
const counts = {
|
||||
total: tasks.length,
|
||||
pending: 0,
|
||||
running: 0,
|
||||
success: 0,
|
||||
failed: 0,
|
||||
};
|
||||
tasks.forEach(task => {
|
||||
counts[task.status] += 1;
|
||||
});
|
||||
return counts;
|
||||
}, [tasks]);
|
||||
|
||||
const filteredTasks = useMemo(() => {
|
||||
const normalizedKeyword = keyword.trim().toLowerCase();
|
||||
return tasks.filter(task => {
|
||||
if (statusFilter.length > 0 && !statusFilter.includes(task.status)) return false;
|
||||
if (!normalizedKeyword) return true;
|
||||
const haystack = [
|
||||
task.id,
|
||||
task.name,
|
||||
JSON.stringify(task.task_info ?? {}),
|
||||
task.meta ? JSON.stringify(task.meta) : '',
|
||||
task.error ?? '',
|
||||
].join(' ').toLowerCase();
|
||||
return haystack.includes(normalizedKeyword);
|
||||
});
|
||||
}, [keyword, statusFilter, tasks]);
|
||||
|
||||
const progressLabel = useCallback((task: QueuedTask) => {
|
||||
const percent = task.progress?.percent;
|
||||
const stage = task.progress?.stage;
|
||||
if (percent !== undefined && percent !== null) {
|
||||
const percentText = `${percent.toFixed(1)}%`;
|
||||
return stage ? `${percentText} · ${stage}` : percentText;
|
||||
}
|
||||
return stage ?? '--';
|
||||
}, []);
|
||||
|
||||
const columns: ColumnsType<QueuedTask> = useMemo(() => ([
|
||||
{
|
||||
title: 'ID',
|
||||
dataIndex: 'id',
|
||||
width: 160,
|
||||
render: (id: string) => (
|
||||
<Tooltip title={id}>
|
||||
<Tag color="default" style={{ cursor: 'pointer', margin: 0 }}>
|
||||
<Typography.Text style={{ fontSize: 12 }} copyable={{ text: id }}>
|
||||
{id.slice(0, 8)}
|
||||
</Typography.Text>
|
||||
</Tag>
|
||||
</Tooltip>
|
||||
),
|
||||
},
|
||||
{
|
||||
title: t('Task Type'),
|
||||
dataIndex: 'name',
|
||||
sorter: (a, b) => a.name.localeCompare(b.name),
|
||||
width: 160,
|
||||
},
|
||||
{
|
||||
title: t('Status'),
|
||||
dataIndex: 'status',
|
||||
width: 120,
|
||||
render: (status: QueueStatus) => {
|
||||
const colorMap: Record<QueueStatus, string> = {
|
||||
pending: 'default',
|
||||
running: 'processing',
|
||||
success: 'success',
|
||||
failed: 'error',
|
||||
};
|
||||
const labelMap: Record<QueueStatus, string> = {
|
||||
pending: t('Pending'),
|
||||
running: t('Running'),
|
||||
success: t('Success'),
|
||||
failed: t('Failed'),
|
||||
};
|
||||
return <Tag color={colorMap[status]}>{labelMap[status]}</Tag>;
|
||||
},
|
||||
},
|
||||
{
|
||||
title: t('Progress'),
|
||||
render: (_: unknown, record) => (
|
||||
record.progress?.percent !== undefined && record.progress.percent !== null
|
||||
? <Progress percent={Number(record.progress.percent.toFixed(1))} size="small" status={record.status === 'failed' ? 'exception' : record.status === 'success' ? 'success' : 'active'} />
|
||||
: <Typography.Text type="secondary" style={{ fontSize: 12 }}>{progressLabel(record)}</Typography.Text>
|
||||
),
|
||||
},
|
||||
{
|
||||
title: t('Details'),
|
||||
dataIndex: 'task_info',
|
||||
render: (_: unknown, record) => {
|
||||
const info = record.task_info ? JSON.stringify(record.task_info) : '--';
|
||||
return (
|
||||
<Typography.Paragraph
|
||||
ellipsis={{ rows: 2, expandable: true, symbol: t('Expand') }}
|
||||
style={{ marginBottom: 0, fontFamily: 'Menlo, Consolas, monospace', fontSize: 12 }}
|
||||
>
|
||||
{info}
|
||||
</Typography.Paragraph>
|
||||
);
|
||||
},
|
||||
},
|
||||
{
|
||||
title: t('Error'),
|
||||
dataIndex: 'error',
|
||||
render: (error: string | undefined) => error ? <Typography.Text type="danger">{error}</Typography.Text> : '--',
|
||||
},
|
||||
]), [progressLabel, t]);
|
||||
|
||||
const handleSaveSettings = useCallback(async () => {
|
||||
setSettingsLoading(true);
|
||||
try {
|
||||
const next = await tasksApi.updateQueueSettings({ concurrency: concurrencyDraft });
|
||||
setSettings(next);
|
||||
setConcurrencyDraft(next.concurrency);
|
||||
messageApi.success(t('Settings saved'));
|
||||
await loadTasks();
|
||||
} catch (err) {
|
||||
const msg = err instanceof Error && err.message ? err.message : t('Operation failed');
|
||||
messageApi.error(msg);
|
||||
} finally {
|
||||
setSettingsLoading(false);
|
||||
}
|
||||
}, [concurrencyDraft, loadTasks, messageApi, t]);
|
||||
|
||||
const lastUpdatedText = useMemo(() => {
|
||||
if (!lastUpdated) return '--';
|
||||
return new Date(lastUpdated).toLocaleTimeString();
|
||||
}, [lastUpdated]);
|
||||
|
||||
return (
|
||||
<>
|
||||
{contextHolder}
|
||||
<PageCard
|
||||
title={t('Task Queue')}
|
||||
extra={
|
||||
<Space size={16} wrap>
|
||||
<Typography.Text type="secondary">{t('Last updated at {time}', { time: lastUpdatedText })}</Typography.Text>
|
||||
<Segmented
|
||||
options={[{ label: t('Auto'), value: 'on' }, { label: t('Manual'), value: 'off' }]}
|
||||
value={autoRefresh}
|
||||
onChange={(value) => setAutoRefresh(value as 'on' | 'off')}
|
||||
/>
|
||||
<Button onClick={() => loadTasks(true)} loading={loading} type="default" disabled={autoRefresh === 'on'}>
|
||||
{t('Refresh')}
|
||||
</Button>
|
||||
</Space>
|
||||
}
|
||||
>
|
||||
<Row gutter={[16, 16]} style={{ marginBottom: 24 }}>
|
||||
{[{
|
||||
label: t('Total Tasks'), value: metrics.total, color: '#1677ff'
|
||||
}, {
|
||||
label: t('Running Tasks'), value: metrics.running, color: '#52c41a'
|
||||
}, {
|
||||
label: t('Waiting Tasks'), value: metrics.pending, color: '#faad14'
|
||||
}, {
|
||||
label: t('Failed Tasks'), value: metrics.failed, color: '#ff4d4f'
|
||||
}, {
|
||||
label: t('Active Workers'), value: settings.active_workers, color: '#722ed1'
|
||||
}].map((item) => (
|
||||
<Col key={item.label} xs={24} sm={12} md={8} lg={6} xl={4}>
|
||||
<Card size="small" bodyStyle={{ padding: '12px 16px' }}>
|
||||
<Typography.Text type="secondary">{item.label}</Typography.Text>
|
||||
<Typography.Title level={4} style={{ margin: '4px 0 0', color: item.color }}>{item.value}</Typography.Title>
|
||||
</Card>
|
||||
</Col>
|
||||
))}
|
||||
</Row>
|
||||
|
||||
<Form layout="inline" style={{ marginBottom: 16, gap: 16, flexWrap: 'wrap' }}>
|
||||
<Form.Item>
|
||||
<Input
|
||||
allowClear
|
||||
placeholder={t('Search by name or ID')}
|
||||
prefix={<SearchOutlined style={{ color: 'var(--ant-color-text-tertiary)' }} />}
|
||||
onChange={(e) => setKeyword(e.target.value)}
|
||||
style={{ width: 260 }}
|
||||
/>
|
||||
</Form.Item>
|
||||
<Form.Item>
|
||||
<Select
|
||||
mode="multiple"
|
||||
allowClear
|
||||
placeholder={t('Filter by status')}
|
||||
value={statusFilter}
|
||||
maxTagCount={2}
|
||||
onChange={(value) => setStatusFilter(value as QueueStatus[])}
|
||||
options={statusOptions}
|
||||
style={{ minWidth: 220 }}
|
||||
/>
|
||||
</Form.Item>
|
||||
<Form.Item>
|
||||
<Tooltip title={t('Adjust worker concurrency immediately')}>
|
||||
<Space size={8}>
|
||||
<Typography.Text>{t('Queue Concurrency')}:</Typography.Text>
|
||||
<InputNumber
|
||||
min={1}
|
||||
controls={false}
|
||||
style={{ width: 80 }}
|
||||
value={concurrencyDraft}
|
||||
onChange={(value) => setConcurrencyDraft(value ?? 1)}
|
||||
/>
|
||||
<Button type="primary" onClick={handleSaveSettings} loading={settingsLoading}>
|
||||
{t('Save')}
|
||||
</Button>
|
||||
</Space>
|
||||
</Tooltip>
|
||||
</Form.Item>
|
||||
</Form>
|
||||
|
||||
<Table
|
||||
rowKey="id"
|
||||
dataSource={filteredTasks}
|
||||
columns={columns}
|
||||
loading={loading}
|
||||
pagination={{ pageSize: 10 }}
|
||||
style={{ marginBottom: 0 }}
|
||||
/>
|
||||
</PageCard>
|
||||
</>
|
||||
);
|
||||
});
|
||||
|
||||
export default TaskQueuePage;
|
||||
@@ -1,7 +1,7 @@
|
||||
import { memo, useState, useEffect, useCallback } from 'react';
|
||||
import { Table, Button, Space, Drawer, Form, Input, Switch, message, Typography, Popconfirm, Select, Modal, Tag } from 'antd';
|
||||
import { Table, Button, Space, Drawer, Form, Input, Switch, message, Typography, Popconfirm, Select } from 'antd';
|
||||
import PageCard from '../components/PageCard';
|
||||
import { tasksApi, type AutomationTask, type QueuedTask } from '../api/tasks';
|
||||
import { tasksApi, type AutomationTask } from '../api/tasks';
|
||||
import { processorsApi, type ProcessorTypeMeta } from '../api/processors';
|
||||
import { ProcessorConfigForm } from '../components/ProcessorConfigForm';
|
||||
import { useI18n } from '../i18n';
|
||||
@@ -14,9 +14,6 @@ const TasksPage = memo(function TasksPage() {
|
||||
const [editing, setEditing] = useState<AutomationTask | null>(null);
|
||||
const [form] = Form.useForm();
|
||||
const [availableProcessors, setAvailableProcessors] = useState<ProcessorTypeMeta[]>([]);
|
||||
const [queueModalOpen, setQueueModalOpen] = useState(false);
|
||||
const [queuedTasks, setQueuedTasks] = useState<QueuedTask[]>([]);
|
||||
const [queueLoading, setQueueLoading] = useState(false);
|
||||
const { t } = useI18n();
|
||||
const [pathPickerOpen, setPathPickerOpen] = useState(false);
|
||||
|
||||
@@ -93,23 +90,6 @@ const TasksPage = memo(function TasksPage() {
|
||||
}
|
||||
};
|
||||
|
||||
const fetchQueue = async () => {
|
||||
setQueueLoading(true);
|
||||
try {
|
||||
const tasks = await tasksApi.getQueue();
|
||||
setQueuedTasks(tasks);
|
||||
} catch (e: any) {
|
||||
message.error(e.message || '加载队列失败');
|
||||
} finally {
|
||||
setQueueLoading(false);
|
||||
}
|
||||
};
|
||||
|
||||
const openQueueModal = () => {
|
||||
setQueueModalOpen(true);
|
||||
fetchQueue();
|
||||
};
|
||||
|
||||
const toggleEnabled = async (rec: AutomationTask, enabled: boolean) => {
|
||||
setEditing(rec);
|
||||
setLoading(true);
|
||||
@@ -162,7 +142,6 @@ const TasksPage = memo(function TasksPage() {
|
||||
extra={
|
||||
<Space>
|
||||
<Button onClick={fetchList} loading={loading}>{t('Refresh')}</Button>
|
||||
<Button onClick={openQueueModal}>{t('Running Tasks')}</Button>
|
||||
<Button type="primary" onClick={openCreate}>{t('Create Task')}</Button>
|
||||
</Space>
|
||||
}
|
||||
@@ -232,40 +211,6 @@ const TasksPage = memo(function TasksPage() {
|
||||
onCancel={() => setPathPickerOpen(false)}
|
||||
onOk={(p) => { form.setFieldsValue({ path_pattern: p }); setPathPickerOpen(false); }}
|
||||
/>
|
||||
<Modal
|
||||
title={t('Current Task Queue')}
|
||||
open={queueModalOpen}
|
||||
onCancel={() => setQueueModalOpen(false)}
|
||||
width={800}
|
||||
footer={[
|
||||
<Button key="refresh" onClick={fetchQueue} loading={queueLoading}>{t('Refresh')}</Button>,
|
||||
<Button key="close" onClick={() => setQueueModalOpen(false)}>{t('Close')}</Button>
|
||||
]}
|
||||
>
|
||||
<Table
|
||||
size="small"
|
||||
rowKey="id"
|
||||
dataSource={queuedTasks}
|
||||
loading={queueLoading}
|
||||
pagination={false}
|
||||
columns={[
|
||||
{ title: 'ID', dataIndex: 'id', width: 120, render: (id) => <Typography.Text style={{ fontSize: 12 }} copyable={{ text: id }}>{id.slice(0, 8)}</Typography.Text> },
|
||||
{ title: t('Task Name'), dataIndex: 'name' },
|
||||
{ title: t('Params'), dataIndex: 'task_info', render: (info) => <Typography.Text type="secondary" style={{ fontSize: 12 }}>{JSON.stringify(info)}</Typography.Text> },
|
||||
{
|
||||
title: t('Status'), dataIndex: 'status', width: 100, render: (status: QueuedTask['status']) => {
|
||||
const colorMap = {
|
||||
pending: 'default',
|
||||
running: 'processing',
|
||||
success: 'success',
|
||||
failed: 'error'
|
||||
};
|
||||
return <Tag color={colorMap[status]}>{status}</Tag>;
|
||||
}
|
||||
},
|
||||
]}
|
||||
/>
|
||||
</Modal>
|
||||
</PageCard>
|
||||
);
|
||||
});
|
||||
|
||||
@@ -7,6 +7,7 @@ import FileExplorerPage from '../pages/FileExplorerPage/FileExplorerPage.tsx';
|
||||
import AdaptersPage from '../pages/AdaptersPage.tsx';
|
||||
import SharePage from '../pages/SharePage.tsx';
|
||||
import TasksPage from '../pages/TasksPage.tsx';
|
||||
import TaskQueuePage from '../pages/TaskQueuePage.tsx';
|
||||
import ProcessorsPage from '../pages/ProcessorsPage.tsx';
|
||||
import OfflineDownloadPage from '../pages/OfflineDownloadPage.tsx';
|
||||
import SystemSettingsPage from '../pages/SystemSettingsPage/SystemSettingsPage.tsx';
|
||||
@@ -38,6 +39,7 @@ const ShellBody = memo(function ShellBody() {
|
||||
{navKey === 'files' && <FileExplorerPage />}
|
||||
{navKey === 'share' && <SharePage />}
|
||||
{navKey === 'tasks' && <TasksPage />}
|
||||
{navKey === 'task-queue' && <TaskQueuePage />}
|
||||
{navKey === 'processors' && <ProcessorsPage />}
|
||||
{navKey === 'offline' && <OfflineDownloadPage />}
|
||||
{navKey === 'plugins' && <PluginsPage />}
|
||||
|
||||
Reference in New Issue
Block a user