feat: implement cron-based automation task scheduling and update task configuration

This commit is contained in:
shiyu
2026-01-15 15:04:10 +08:00
parent 3e1b75d81a
commit ab89451b2d
14 changed files with 232 additions and 60 deletions

View File

@@ -1,4 +1,5 @@
from .service import TaskService
from .scheduler import task_scheduler
from .task_queue import Task, TaskProgress, TaskStatus, task_queue_service
from .types import (
AutomationTaskBase,
@@ -15,6 +16,7 @@ __all__ = [
"TaskProgress",
"TaskStatus",
"task_queue_service",
"task_scheduler",
"AutomationTaskBase",
"AutomationTaskCreate",
"AutomationTaskRead",

View File

@@ -59,8 +59,7 @@ async def get_task_status(task_id: str, request: Request, current_user: CurrentU
body_fields=[
"name",
"event",
"path_pattern",
"filename_regex",
"trigger_config",
"processor_type",
"processor_config",
"enabled",
@@ -93,8 +92,7 @@ async def list_tasks(request: Request, current_user: CurrentUser):
body_fields=[
"name",
"event",
"path_pattern",
"filename_regex",
"trigger_config",
"processor_type",
"processor_config",
"enabled",

102
domain/tasks/scheduler.py Normal file
View File

@@ -0,0 +1,102 @@
import asyncio
from dataclasses import dataclass
from datetime import datetime
from croniter import croniter
from models.database import AutomationTask
from .task_queue import task_queue_service
@dataclass
class CronTaskItem:
task_id: int
processor_type: str
path: str
cron: croniter
next_run: datetime
class AutomationTaskScheduler:
def __init__(self):
self._items: list[CronTaskItem] = []
self._worker: asyncio.Task | None = None
self._reload_event = asyncio.Event()
self._stop_event = asyncio.Event()
async def start(self) -> None:
if self._worker and not self._worker.done():
return
self._stop_event.clear()
await self._load_tasks()
self._worker = asyncio.create_task(self._run_loop())
async def stop(self) -> None:
if not self._worker:
return
self._stop_event.set()
self._reload_event.set()
await self._worker
self._worker = None
def refresh(self) -> None:
if self._worker and not self._worker.done():
self._reload_event.set()
async def _load_tasks(self) -> None:
tasks = await AutomationTask.filter(event="cron", enabled=True)
items: list[CronTaskItem] = []
now = datetime.now()
for task in tasks:
trigger = task.trigger_config or {}
if not isinstance(trigger, dict):
continue
cron_expr = trigger.get("cron_expr")
path = trigger.get("path")
if not cron_expr or not path:
continue
cron = self._build_cron(cron_expr, now)
if not cron:
continue
next_run = cron.get_next(datetime)
items.append(
CronTaskItem(
task_id=task.id,
processor_type=task.processor_type,
path=path,
cron=cron,
next_run=next_run,
)
)
self._items = items
def _build_cron(self, expr: str, base_time: datetime) -> croniter | None:
expr = str(expr or "").strip()
if not expr:
return None
parts = [p for p in expr.split() if p]
if len(parts) not in (5, 6):
return None
second_at_beginning = len(parts) == 6
try:
return croniter(expr, base_time, second_at_beginning=second_at_beginning)
except Exception:
return None
async def _run_loop(self) -> None:
while not self._stop_event.is_set():
if self._reload_event.is_set():
self._reload_event.clear()
await self._load_tasks()
now = datetime.now()
for item in list(self._items):
if item.next_run <= now:
await task_queue_service.add_task(
item.processor_type,
{"task_id": item.task_id, "path": item.path},
)
item.next_run = item.cron.get_next(datetime)
await asyncio.sleep(1)
task_scheduler = AutomationTaskScheduler()

View File

@@ -5,6 +5,7 @@ from fastapi import Depends, HTTPException
from domain.auth import User, get_current_active_user
from domain.config import ConfigService
from .scheduler import task_scheduler
from .task_queue import task_queue_service
from .types import (
AutomationTaskCreate,
@@ -46,6 +47,7 @@ class TaskService:
@classmethod
async def create_task(cls, payload: AutomationTaskCreate, user: Optional[User]) -> AutomationTask:
task = await AutomationTask.create(**payload.model_dump())
task_scheduler.refresh()
return task
@classmethod
@@ -69,6 +71,7 @@ class TaskService:
for key, value in update_data.items():
setattr(task, key, value)
await task.save()
task_scheduler.refresh()
return task
@classmethod
@@ -76,6 +79,7 @@ class TaskService:
deleted_count = await AutomationTask.filter(id=task_id).delete()
if not deleted_count:
raise HTTPException(status_code=404, detail=f"Task {task_id} not found")
task_scheduler.refresh()
@classmethod
async def trigger_tasks(cls, event: str, path: str):
@@ -86,11 +90,16 @@ class TaskService:
@classmethod
def match(cls, task: AutomationTask, path: str) -> bool:
if task.path_pattern and not path.startswith(task.path_pattern):
trigger_config = task.trigger_config or {}
if not isinstance(trigger_config, dict):
trigger_config = {}
path_prefix = trigger_config.get("path_prefix")
filename_regex = trigger_config.get("filename_regex")
if path_prefix and not path.startswith(path_prefix):
return False
if task.filename_regex:
if filename_regex:
filename = path.split("/")[-1]
if not re.match(task.filename_regex, filename):
if not re.match(filename_regex, filename):
return False
return True

View File

@@ -88,32 +88,27 @@ class TaskQueueService:
task.result = result
elif task.name == "automation_task" or self._is_processor_task(task.name):
from models.database import AutomationTask
from domain.processors import get_processor
params = task.task_info
auto_task = await AutomationTask.get(id=params["task_id"])
path = params["path"]
processor_type = auto_task.processor_type if task.name == "automation_task" else task.name
processor = get_processor(processor_type)
if not processor:
raise ValueError(f"Processor {processor_type} not found for task {auto_task.id}")
if processor_type != auto_task.processor_type:
processor_type = auto_task.processor_type
processor = get_processor(processor_type)
if not processor:
raise ValueError(f"Processor {processor_type} not found for task {auto_task.id}")
requires_input_bytes = bool(getattr(processor, "requires_input_bytes", True))
file_content = b""
if requires_input_bytes:
file_content = await VirtualFSService.read_file(path)
result = await processor.process(file_content, path, auto_task.processor_config)
save_to = auto_task.processor_config.get("save_to")
if save_to and getattr(processor, "produces_file", False):
await VirtualFSService.write_file(save_to, result)
processor_type = auto_task.processor_type
config = auto_task.processor_config or {}
save_to = config.get("save_to") if isinstance(config, dict) else None
overwrite = bool(config.get("overwrite")) if isinstance(config, dict) else False
try:
if await VirtualFSService.path_is_directory(path):
overwrite = True
except Exception:
pass
await VirtualFSService.process_file(
path=path,
processor_type=processor_type,
config=config if isinstance(config, dict) else {},
save_to=save_to,
overwrite=overwrite,
)
task.result = "Automation task completed"
elif task.name == "offline_http_download":
from domain.offline_downloads import OfflineDownloadService
@@ -129,7 +124,6 @@ class TaskQueueService:
task.result = "Email sent"
else:
raise ValueError(f"Unknown task name: {task.name}")
task.status = TaskStatus.SUCCESS
except Exception as e:

View File

@@ -6,8 +6,7 @@ from pydantic import BaseModel, Field
class AutomationTaskBase(BaseModel):
name: str
event: str
path_pattern: Optional[str] = None
filename_regex: Optional[str] = None
trigger_config: Dict[str, Any] = {}
processor_type: str
processor_config: Dict[str, Any] = {}
enabled: bool = True
@@ -22,6 +21,7 @@ class AutomationTaskUpdate(AutomationTaskBase):
event: Optional[str] = None
processor_type: Optional[str] = None
processor_config: Optional[Dict[str, Any]] = None
trigger_config: Optional[Dict[str, Any]] = None
enabled: Optional[bool] = None