mirror of
https://github.com/DrizzleTime/Foxel.git
synced 2026-05-11 09:59:50 +08:00
103 lines
3.1 KiB
Python
103 lines
3.1 KiB
Python
import asyncio
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
|
|
from croniter import croniter
|
|
|
|
from models.database import AutomationTask
|
|
from .task_queue import task_queue_service
|
|
|
|
|
|
@dataclass
|
|
class CronTaskItem:
|
|
task_id: int
|
|
processor_type: str
|
|
path: str
|
|
cron: croniter
|
|
next_run: datetime
|
|
|
|
|
|
class AutomationTaskScheduler:
|
|
def __init__(self):
|
|
self._items: list[CronTaskItem] = []
|
|
self._worker: asyncio.Task | None = None
|
|
self._reload_event = asyncio.Event()
|
|
self._stop_event = asyncio.Event()
|
|
|
|
async def start(self) -> None:
|
|
if self._worker and not self._worker.done():
|
|
return
|
|
self._stop_event.clear()
|
|
await self._load_tasks()
|
|
self._worker = asyncio.create_task(self._run_loop())
|
|
|
|
async def stop(self) -> None:
|
|
if not self._worker:
|
|
return
|
|
self._stop_event.set()
|
|
self._reload_event.set()
|
|
await self._worker
|
|
self._worker = None
|
|
|
|
def refresh(self) -> None:
|
|
if self._worker and not self._worker.done():
|
|
self._reload_event.set()
|
|
|
|
async def _load_tasks(self) -> None:
|
|
tasks = await AutomationTask.filter(event="cron", enabled=True)
|
|
items: list[CronTaskItem] = []
|
|
now = datetime.now()
|
|
for task in tasks:
|
|
trigger = task.trigger_config or {}
|
|
if not isinstance(trigger, dict):
|
|
continue
|
|
cron_expr = trigger.get("cron_expr")
|
|
path = trigger.get("path")
|
|
if not cron_expr or not path:
|
|
continue
|
|
cron = self._build_cron(cron_expr, now)
|
|
if not cron:
|
|
continue
|
|
next_run = cron.get_next(datetime)
|
|
items.append(
|
|
CronTaskItem(
|
|
task_id=task.id,
|
|
processor_type=task.processor_type,
|
|
path=path,
|
|
cron=cron,
|
|
next_run=next_run,
|
|
)
|
|
)
|
|
self._items = items
|
|
|
|
def _build_cron(self, expr: str, base_time: datetime) -> croniter | None:
|
|
expr = str(expr or "").strip()
|
|
if not expr:
|
|
return None
|
|
parts = [p for p in expr.split() if p]
|
|
if len(parts) not in (5, 6):
|
|
return None
|
|
second_at_beginning = len(parts) == 6
|
|
try:
|
|
return croniter(expr, base_time, second_at_beginning=second_at_beginning)
|
|
except Exception:
|
|
return None
|
|
|
|
async def _run_loop(self) -> None:
|
|
while not self._stop_event.is_set():
|
|
if self._reload_event.is_set():
|
|
self._reload_event.clear()
|
|
await self._load_tasks()
|
|
now = datetime.now()
|
|
for item in list(self._items):
|
|
if item.next_run <= now:
|
|
await task_queue_service.add_task(
|
|
item.processor_type,
|
|
{"task_id": item.task_id, "path": item.path},
|
|
)
|
|
item.next_run = item.cron.get_next(datetime)
|
|
await asyncio.sleep(1)
|
|
|
|
|
|
task_scheduler = AutomationTaskScheduler()
|