diff --git a/app/api/endpoints/workflow.py b/app/api/endpoints/workflow.py index fa46c123..bde10c6f 100644 --- a/app/api/endpoints/workflow.py +++ b/app/api/endpoints/workflow.py @@ -9,6 +9,7 @@ from app.db import get_db from app.db.models.workflow import Workflow from app.db.user_oper import get_current_active_user from app.chain.workflow import WorkflowChain +from app.scheduler import Scheduler router = APIRouter() @@ -67,17 +68,52 @@ def delete_workflow(workflow_id: int, """ 删除工作流 """ + workflow = Workflow.get(db, workflow_id) + if not workflow: + return schemas.Response(success=False, message="工作流不存在") + Scheduler().remove_workflow_job(workflow) Workflow.delete(db, workflow_id) return schemas.Response(success=True, message="删除成功") -@router.get("/run/{workfow_id}", summary="执行工作流", response_model=schemas.Response) -def run_workflow(workfow_id: int, +@router.post("/{workflow_id}/run", summary="执行工作流", response_model=schemas.Response) +def run_workflow(workflow_id: int, from_begin: bool = True, _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: """ 执行工作流 """ - if WorkflowChain().process(workfow_id, from_begin=from_begin): - return schemas.Response(success=True, message="执行成功") - return schemas.Response(success=False, message="执行失败") + state, errmsg = WorkflowChain().process(workflow_id, from_begin=from_begin) + if not state: + return schemas.Response(success=False, message=errmsg) + return schemas.Response(success=True) + + +@router.post("/{workflow_id}/start", summary="启用工作流", response_model=schemas.Response) +def start_workflow(workflow_id: int, + db: Session = Depends(get_db), + _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + """ + 启用工作流 + """ + workflow = Workflow.get(db, workflow_id) + if not workflow: + return schemas.Response(success=False, message="工作流不存在") + Scheduler().remove_workflow_job(workflow) + workflow.update_state(db, workflow_id, "W") + return schemas.Response(success=True) + + +@router.post("/{workflow_id}/pause", summary="停用工作流", response_model=schemas.Response) +def pause_workflow(workflow_id: int, + db: Session = Depends(get_db), + _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any: + """ + 停用工作流 + """ + workflow = Workflow.get(db, workflow_id) + if not workflow: + return schemas.Response(success=False, message="工作流不存在") + Scheduler().remove_workflow_job(workflow) + workflow.update_state(db, workflow_id, "P") + return schemas.Response(success=True) diff --git a/app/chain/workflow.py b/app/chain/workflow.py index 50facaf2..6bdc8c56 100644 --- a/app/chain/workflow.py +++ b/app/chain/workflow.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import List +from typing import List, Tuple from concurrent.futures import ThreadPoolExecutor, as_completed @@ -21,7 +21,7 @@ class WorkflowChain(ChainBase): self.workflowoper = WorkflowOper() self.workflowmanager = WorkFlowManager() - def process(self, workflow_id: int, from_begin: bool = True) -> bool: + def process(self, workflow_id: int, from_begin: bool = True) -> Tuple[bool, str]: """ 处理工作流 :param workflow_id: 工作流ID @@ -57,15 +57,16 @@ class WorkflowChain(ChainBase): workflow = self.workflowoper.get(workflow_id) if not workflow: logger.warn(f"工作流 {workflow_id} 不存在") - return False + return False, "工作流不存在" if not workflow.actions: logger.warn(f"工作流 {workflow.name} 无动作") - return False + return False, "工作流无动作" if not workflow.flows: logger.warn(f"工作流 {workflow.name} 无流程") - return False + return False, "工作流无流程" logger.info(f"开始处理 {workflow.name},共 {len(workflow.actions)} 个动作 ...") + self.workflowoper.start(workflow_id) # 启用上下文 if not from_begin and workflow.current_action: @@ -113,8 +114,8 @@ class WorkflowChain(ChainBase): context=context.dict()) if success_count < len(next_actions): logger.error(f"动作 {current_acttion_names} 未全部成功,工作流失败") - self.workflowoper.fail(workflow_id, result=f"动作 {current_acttion_names} 执行失败") - return False + self.workflowoper.fail(workflow_id, result=f"动作 {current_acttion_names} 未全部成功") + return False, f"动作 {current_acttion_names} 未全部成功" else: logger.info(f"动作 {current_acttion_names} 执行完成,耗时:{(end_time - start_time).seconds} 秒") else: @@ -132,12 +133,12 @@ class WorkflowChain(ChainBase): if not state: logger.error(f"动作 {action.name} 执行失败,工作流失败") self.workflowoper.fail(workflow_id, result=f"动作 {action.name} 执行失败") - return False + return False, f"动作 {action.name} 执行失败" logger.info(f"动作 {action.name} 执行完成,耗时:{(end_time - start_time).seconds} 秒") logger.info(f"工作流 {workflow.name} 执行完成") self.workflowoper.success(workflow_id) - return True + return True, "" def get_workflows(self) -> List[Workflow]: """