diff --git a/domain/tasks/__init__.py b/domain/tasks/__init__.py index d8301f1..d6d86bc 100644 --- a/domain/tasks/__init__.py +++ b/domain/tasks/__init__.py @@ -1,4 +1,5 @@ from .service import TaskService +from .scheduler import task_scheduler from .task_queue import Task, TaskProgress, TaskStatus, task_queue_service from .types import ( AutomationTaskBase, @@ -15,6 +16,7 @@ __all__ = [ "TaskProgress", "TaskStatus", "task_queue_service", + "task_scheduler", "AutomationTaskBase", "AutomationTaskCreate", "AutomationTaskRead", diff --git a/domain/tasks/api.py b/domain/tasks/api.py index a7fba9e..3279f0a 100644 --- a/domain/tasks/api.py +++ b/domain/tasks/api.py @@ -59,8 +59,7 @@ async def get_task_status(task_id: str, request: Request, current_user: CurrentU body_fields=[ "name", "event", - "path_pattern", - "filename_regex", + "trigger_config", "processor_type", "processor_config", "enabled", @@ -93,8 +92,7 @@ async def list_tasks(request: Request, current_user: CurrentUser): body_fields=[ "name", "event", - "path_pattern", - "filename_regex", + "trigger_config", "processor_type", "processor_config", "enabled", diff --git a/domain/tasks/scheduler.py b/domain/tasks/scheduler.py new file mode 100644 index 0000000..d4b0a64 --- /dev/null +++ b/domain/tasks/scheduler.py @@ -0,0 +1,102 @@ +import asyncio +from dataclasses import dataclass +from datetime import datetime + +from croniter import croniter + +from models.database import AutomationTask +from .task_queue import task_queue_service + + +@dataclass +class CronTaskItem: + task_id: int + processor_type: str + path: str + cron: croniter + next_run: datetime + + +class AutomationTaskScheduler: + def __init__(self): + self._items: list[CronTaskItem] = [] + self._worker: asyncio.Task | None = None + self._reload_event = asyncio.Event() + self._stop_event = asyncio.Event() + + async def start(self) -> None: + if self._worker and not self._worker.done(): + return + self._stop_event.clear() + await self._load_tasks() + self._worker = asyncio.create_task(self._run_loop()) + + async def stop(self) -> None: + if not self._worker: + return + self._stop_event.set() + self._reload_event.set() + await self._worker + self._worker = None + + def refresh(self) -> None: + if self._worker and not self._worker.done(): + self._reload_event.set() + + async def _load_tasks(self) -> None: + tasks = await AutomationTask.filter(event="cron", enabled=True) + items: list[CronTaskItem] = [] + now = datetime.now() + for task in tasks: + trigger = task.trigger_config or {} + if not isinstance(trigger, dict): + continue + cron_expr = trigger.get("cron_expr") + path = trigger.get("path") + if not cron_expr or not path: + continue + cron = self._build_cron(cron_expr, now) + if not cron: + continue + next_run = cron.get_next(datetime) + items.append( + CronTaskItem( + task_id=task.id, + processor_type=task.processor_type, + path=path, + cron=cron, + next_run=next_run, + ) + ) + self._items = items + + def _build_cron(self, expr: str, base_time: datetime) -> croniter | None: + expr = str(expr or "").strip() + if not expr: + return None + parts = [p for p in expr.split() if p] + if len(parts) not in (5, 6): + return None + second_at_beginning = len(parts) == 6 + try: + return croniter(expr, base_time, second_at_beginning=second_at_beginning) + except Exception: + return None + + async def _run_loop(self) -> None: + while not self._stop_event.is_set(): + if self._reload_event.is_set(): + self._reload_event.clear() + await self._load_tasks() + now = datetime.now() + for item in list(self._items): + if item.next_run <= now: + await task_queue_service.add_task( + item.processor_type, + {"task_id": item.task_id, "path": item.path}, + ) + item.next_run = item.cron.get_next(datetime) + await asyncio.sleep(1) + + +task_scheduler = AutomationTaskScheduler() diff --git a/domain/tasks/service.py b/domain/tasks/service.py index 321b9ae..3f697d7 100644 --- a/domain/tasks/service.py +++ b/domain/tasks/service.py @@ -5,6 +5,7 @@ from fastapi import Depends, HTTPException from domain.auth import User, get_current_active_user from domain.config import ConfigService +from .scheduler import task_scheduler from .task_queue import task_queue_service from .types import ( AutomationTaskCreate, @@ -46,6 +47,7 @@ class TaskService: @classmethod async def create_task(cls, payload: AutomationTaskCreate, user: Optional[User]) -> AutomationTask: task = await AutomationTask.create(**payload.model_dump()) + task_scheduler.refresh() return task @classmethod @@ -69,6 +71,7 @@ class TaskService: for key, value in update_data.items(): setattr(task, key, value) await task.save() + task_scheduler.refresh() return task @classmethod @@ -76,6 +79,7 @@ class TaskService: deleted_count = await AutomationTask.filter(id=task_id).delete() if not deleted_count: raise HTTPException(status_code=404, detail=f"Task {task_id} not found") + task_scheduler.refresh() @classmethod async def trigger_tasks(cls, event: str, path: str): @@ -86,11 +90,16 @@ class TaskService: @classmethod def match(cls, task: AutomationTask, path: str) -> bool: - if task.path_pattern and not path.startswith(task.path_pattern): + trigger_config = task.trigger_config or {} + if not isinstance(trigger_config, dict): + trigger_config = {} + path_prefix = trigger_config.get("path_prefix") + filename_regex = trigger_config.get("filename_regex") + if path_prefix and not path.startswith(path_prefix): return False - if task.filename_regex: + if filename_regex: filename = path.split("/")[-1] - if not re.match(task.filename_regex, filename): + if not re.match(filename_regex, filename): return False return True diff --git a/domain/tasks/task_queue.py b/domain/tasks/task_queue.py index 972c637..e9447f4 100644 --- a/domain/tasks/task_queue.py +++ b/domain/tasks/task_queue.py @@ -88,32 +88,27 @@ class TaskQueueService: task.result = result elif task.name == "automation_task" or self._is_processor_task(task.name): from models.database import AutomationTask - from domain.processors import get_processor params = task.task_info auto_task = await AutomationTask.get(id=params["task_id"]) path = params["path"] - processor_type = auto_task.processor_type if task.name == "automation_task" else task.name - processor = get_processor(processor_type) - if not processor: - raise ValueError(f"Processor {processor_type} not found for task {auto_task.id}") - - if processor_type != auto_task.processor_type: - processor_type = auto_task.processor_type - processor = get_processor(processor_type) - if not processor: - raise ValueError(f"Processor {processor_type} not found for task {auto_task.id}") - - requires_input_bytes = bool(getattr(processor, "requires_input_bytes", True)) - file_content = b"" - if requires_input_bytes: - file_content = await VirtualFSService.read_file(path) - result = await processor.process(file_content, path, auto_task.processor_config) - - save_to = auto_task.processor_config.get("save_to") - if save_to and getattr(processor, "produces_file", False): - await VirtualFSService.write_file(save_to, result) + processor_type = auto_task.processor_type + config = auto_task.processor_config or {} + save_to = config.get("save_to") if isinstance(config, dict) else None + overwrite = bool(config.get("overwrite")) if isinstance(config, dict) else False + try: + if await VirtualFSService.path_is_directory(path): + overwrite = True + except Exception: + pass + await VirtualFSService.process_file( + path=path, + processor_type=processor_type, + config=config if isinstance(config, dict) else {}, + save_to=save_to, + overwrite=overwrite, + ) task.result = "Automation task completed" elif task.name == "offline_http_download": from domain.offline_downloads import OfflineDownloadService @@ -129,7 +124,6 @@ class TaskQueueService: task.result = "Email sent" else: raise ValueError(f"Unknown task name: {task.name}") - task.status = TaskStatus.SUCCESS except Exception as e: diff --git a/domain/tasks/types.py b/domain/tasks/types.py index 74bc0e7..4d2967e 100644 --- a/domain/tasks/types.py +++ b/domain/tasks/types.py @@ -6,8 +6,7 @@ from pydantic import BaseModel, Field class AutomationTaskBase(BaseModel): name: str event: str - path_pattern: Optional[str] = None - filename_regex: Optional[str] = None + trigger_config: Dict[str, Any] = {} processor_type: str processor_config: Dict[str, Any] = {} enabled: bool = True @@ -22,6 +21,7 @@ class AutomationTaskUpdate(AutomationTaskBase): event: Optional[str] = None processor_type: Optional[str] = None processor_config: Optional[Dict[str, Any]] = None + trigger_config: Optional[Dict[str, Any]] = None enabled: Optional[bool] = None diff --git a/main.py b/main.py index c582ad9..927e8f8 100644 --- a/main.py +++ b/main.py @@ -20,7 +20,7 @@ from middleware.exception_handler import ( ) import httpx from dotenv import load_dotenv -from domain.tasks import task_queue_service +from domain.tasks import task_queue_service, task_scheduler load_dotenv() @@ -73,6 +73,7 @@ async def lifespan(app: FastAPI): # 加载已安装的插件 from domain.plugins import init_plugins await init_plugins(app) + await task_scheduler.start() # 在所有路由加载完成后,挂载静态文件服务(放在最后以避免覆盖 API 路由) app.mount("/", SPAStaticFiles(directory="web/dist", html=True, check_dir=False), name="static") @@ -80,6 +81,7 @@ async def lifespan(app: FastAPI): try: yield finally: + await task_scheduler.stop() await task_queue_service.stop_worker() await close_db() diff --git a/models/database.py b/models/database.py index 7a92de2..b9a39e4 100644 --- a/models/database.py +++ b/models/database.py @@ -116,8 +116,7 @@ class AutomationTask(Model): name = fields.CharField(max_length=100) event = fields.CharField(max_length=50) - path_pattern = fields.CharField(max_length=1024, null=True) - filename_regex = fields.CharField(max_length=255, null=True) + trigger_config = fields.JSONField(null=True) processor_type = fields.CharField(max_length=100) processor_config = fields.JSONField() diff --git a/pyproject.toml b/pyproject.toml index 6c37b63..9563cc2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,7 @@ requires-python = ">=3.14" dependencies = [ "aioboto3>=15.5.0", "bcrypt>=5.0.0", + "croniter>=6.0.0", "fastapi>=0.127.0", "paramiko>=4.0.0", "pillow>=12.0.0", diff --git a/uv.lock b/uv.lock index ced61f2..b43392a 100644 --- a/uv.lock +++ b/uv.lock @@ -318,6 +318,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] +[[package]] +name = "croniter" +version = "6.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "python-dateutil" }, + { name = "pytz" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/ad/2f/44d1ae153a0e27be56be43465e5cb39b9650c781e001e7864389deb25090/croniter-6.0.0.tar.gz", hash = "sha256:37c504b313956114a983ece2c2b07790b1f1094fe9d81cc94739214748255577", size = 64481, upload-time = "2024-12-17T17:17:47.32Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/07/4b/290b4c3efd6417a8b0c284896de19b1d5855e6dbdb97d2a35e68fa42de85/croniter-6.0.0-py2.py3-none-any.whl", hash = "sha256:2f878c3856f17896979b2a4379ba1f09c83e374931ea15cc835c5dd2eee9b368", size = 25468, upload-time = "2024-12-17T17:17:45.359Z" }, +] + [[package]] name = "cryptography" version = "46.0.3" @@ -418,6 +431,7 @@ source = { virtual = "." } dependencies = [ { name = "aioboto3" }, { name = "bcrypt" }, + { name = "croniter" }, { name = "fastapi" }, { name = "paramiko" }, { name = "pillow" }, @@ -437,6 +451,7 @@ dependencies = [ requires-dist = [ { name = "aioboto3", specifier = ">=15.5.0" }, { name = "bcrypt", specifier = ">=5.0.0" }, + { name = "croniter", specifier = ">=6.0.0" }, { name = "fastapi", specifier = ">=0.127.0" }, { name = "paramiko", specifier = ">=4.0.0" }, { name = "pillow", specifier = ">=12.0.0" }, diff --git a/web/src/api/tasks.ts b/web/src/api/tasks.ts index e04f58f..ddec7af 100644 --- a/web/src/api/tasks.ts +++ b/web/src/api/tasks.ts @@ -5,8 +5,7 @@ export interface AutomationTask { id: number; name: string; event: string; - path_pattern?: string; - filename_regex?: string; + trigger_config?: Record; processor_type: string; processor_config: Record; enabled: boolean; diff --git a/web/src/i18n/locales/en.json b/web/src/i18n/locales/en.json index 468c267..000102d 100644 --- a/web/src/i18n/locales/en.json +++ b/web/src/i18n/locales/en.json @@ -521,9 +521,12 @@ "Trigger Event": "Trigger Event", "File Written": "File Written", "File Deleted": "File Deleted", + "Scheduled": "Scheduled", "Matching Rules": "Matching Rules", "Path Prefix (optional)": "Path Prefix (optional)", "Filename Regex (optional)": "Filename Regex (optional)", + "Schedule": "Schedule", + "Cron Expression": "Cron Expression", "Action": "Action", "Current Task Queue": "Current Task Queue", "Params": "Params", diff --git a/web/src/i18n/locales/zh.json b/web/src/i18n/locales/zh.json index af2cc52..6278a92 100644 --- a/web/src/i18n/locales/zh.json +++ b/web/src/i18n/locales/zh.json @@ -512,9 +512,12 @@ "Trigger Event": "触发事件", "File Written": "文件写入", "File Deleted": "文件删除", + "Scheduled": "定时任务", "Matching Rules": "匹配规则", "Path Prefix (optional)": "路径前缀 (可选)", "Filename Regex (optional)": "文件名正则 (可选)", + "Schedule": "定时设置", + "Cron Expression": "Cron 表达式", "Action": "执行动作", "Current Task Queue": "当前任务队列", "Params": "参数", @@ -646,7 +649,6 @@ "Created (newest)": "创建时间(最新)", "Installed already": "已安装", "No results": "暂无结果", - "Downloading": "下载中", "Download and Install": "下载并安装", "Loading apps": "加载应用中", "Failed to load apps": "加载应用失败", diff --git a/web/src/pages/TasksPage.tsx b/web/src/pages/TasksPage.tsx index 279b0ad..5f21c10 100644 --- a/web/src/pages/TasksPage.tsx +++ b/web/src/pages/TasksPage.tsx @@ -15,7 +15,7 @@ const TasksPage = memo(function TasksPage() { const [form] = Form.useForm(); const [availableProcessors, setAvailableProcessors] = useState([]); const { t } = useI18n(); - const [pathPickerOpen, setPathPickerOpen] = useState(false); + const [pathPickerField, setPathPickerField] = useState<'path_prefix' | 'cron_path' | null>(null); const fetchList = useCallback(async () => { setLoading(true); @@ -42,7 +42,8 @@ const TasksPage = memo(function TasksPage() { name: '', event: 'file_written', enabled: true, - processor_config: {} + processor_config: {}, + trigger_config: {} }); setOpen(true); }; @@ -52,7 +53,8 @@ const TasksPage = memo(function TasksPage() { form.resetFields(); form.setFieldsValue({ ...rec, - processor_config: rec.processor_config || {} + processor_config: rec.processor_config || {}, + trigger_config: rec.trigger_config || {} }); setOpen(true); }; @@ -60,7 +62,15 @@ const TasksPage = memo(function TasksPage() { const submit = async () => { try { const values = await form.validateFields(); - const body = { ...values }; + const triggerConfig = { ...(values.trigger_config || {}) }; + if (values.event === 'cron') { + delete triggerConfig.path_prefix; + delete triggerConfig.filename_regex; + } else { + delete triggerConfig.cron_expr; + delete triggerConfig.path; + } + const body = { ...values, trigger_config: triggerConfig }; setLoading(true); if (editing) { await tasksApi.update(editing.id, body); @@ -133,7 +143,10 @@ const TasksPage = memo(function TasksPage() { const selectedProcessor = Form.useWatch('processor_type', form); const currentProcessorMeta = availableProcessors.find(p => p.type === selectedProcessor); - const watchedPathPattern = Form.useWatch('path_pattern', form); + const selectedEvent = Form.useWatch('event', form); + const watchedPathPrefix = Form.useWatch(['trigger_config', 'path_prefix'], form); + const watchedCronPath = Form.useWatch(['trigger_config', 'path'], form); + const isCron = selectedEvent === 'cron'; return ( @@ -158,11 +171,11 @@ const TasksPage = memo(function TasksPage() { title={editing ? `${t('Edit Task')}: ${editing.name}` : t('Create Automation Task')} width={480} open={open} - onClose={() => { setOpen(false); setEditing(null); }} + onClose={() => { setOpen(false); setEditing(null); setPathPickerField(null); }} destroyOnHidden extra={ - + } @@ -174,19 +187,45 @@ const TasksPage = memo(function TasksPage() { setPathPickerOpen(true)}>{t('Select')}} - /> - - - - + {isCron ? ( + <> + {t('Schedule')} + + + + + setPathPickerField('cron_path')}>{t('Select')}} + /> + + + ) : ( + <> + {t('Matching Rules')} + + setPathPickerField('path_prefix')}>{t('Select')}} + /> + + + + + + )} @@ -205,11 +244,18 @@ const TasksPage = memo(function TasksPage() { setPathPickerOpen(false)} - onOk={(p) => { form.setFieldsValue({ path_pattern: p }); setPathPickerOpen(false); }} + open={!!pathPickerField} + mode={pathPickerField === 'cron_path' ? 'any' : 'directory'} + initialPath={(pathPickerField === 'cron_path' ? watchedCronPath : watchedPathPrefix) || '/'} + onCancel={() => setPathPickerField(null)} + onOk={(p) => { + if (pathPickerField === 'cron_path') { + form.setFieldValue(['trigger_config', 'path'], p); + } else if (pathPickerField === 'path_prefix') { + form.setFieldValue(['trigger_config', 'path_prefix'], p); + } + setPathPickerField(null); + }} /> );