fix workflow api

This commit is contained in:
jxxghp
2025-02-25 17:27:21 +08:00
parent 0dc9c98c06
commit 88993cb67b
2 changed files with 51 additions and 14 deletions

View File

@@ -9,6 +9,7 @@ from app.db import get_db
from app.db.models.workflow import Workflow
from app.db.user_oper import get_current_active_user
from app.chain.workflow import WorkflowChain
from app.scheduler import Scheduler
router = APIRouter()
@@ -67,17 +68,52 @@ def delete_workflow(workflow_id: int,
"""
删除工作流
"""
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
Scheduler().remove_workflow_job(workflow)
Workflow.delete(db, workflow_id)
return schemas.Response(success=True, message="删除成功")
@router.get("/run/{workfow_id}", summary="执行工作流", response_model=schemas.Response)
def run_workflow(workfow_id: int,
@router.post("/{workflow_id}/run", summary="执行工作流", response_model=schemas.Response)
def run_workflow(workflow_id: int,
from_begin: bool = True,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
执行工作流
"""
if WorkflowChain().process(workfow_id, from_begin=from_begin):
return schemas.Response(success=True, message="执行成功")
return schemas.Response(success=False, message="执行失败")
state, errmsg = WorkflowChain().process(workflow_id, from_begin=from_begin)
if not state:
return schemas.Response(success=False, message=errmsg)
return schemas.Response(success=True)
@router.post("/{workflow_id}/start", summary="启用工作流", response_model=schemas.Response)
def start_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
启用工作流
"""
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
Scheduler().remove_workflow_job(workflow)
workflow.update_state(db, workflow_id, "W")
return schemas.Response(success=True)
@router.post("/{workflow_id}/pause", summary="停用工作流", response_model=schemas.Response)
def pause_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
"""
停用工作流
"""
workflow = Workflow.get(db, workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
Scheduler().remove_workflow_job(workflow)
workflow.update_state(db, workflow_id, "P")
return schemas.Response(success=True)

View File

@@ -1,5 +1,5 @@
from datetime import datetime
from typing import List
from typing import List, Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -21,7 +21,7 @@ class WorkflowChain(ChainBase):
self.workflowoper = WorkflowOper()
self.workflowmanager = WorkFlowManager()
def process(self, workflow_id: int, from_begin: bool = True) -> bool:
def process(self, workflow_id: int, from_begin: bool = True) -> Tuple[bool, str]:
"""
处理工作流
:param workflow_id: 工作流ID
@@ -57,15 +57,16 @@ class WorkflowChain(ChainBase):
workflow = self.workflowoper.get(workflow_id)
if not workflow:
logger.warn(f"工作流 {workflow_id} 不存在")
return False
return False, "工作流不存在"
if not workflow.actions:
logger.warn(f"工作流 {workflow.name} 无动作")
return False
return False, "工作流无动作"
if not workflow.flows:
logger.warn(f"工作流 {workflow.name} 无流程")
return False
return False, "工作流无流程"
logger.info(f"开始处理 {workflow.name},共 {len(workflow.actions)} 个动作 ...")
self.workflowoper.start(workflow_id)
# 启用上下文
if not from_begin and workflow.current_action:
@@ -113,8 +114,8 @@ class WorkflowChain(ChainBase):
context=context.dict())
if success_count < len(next_actions):
logger.error(f"动作 {current_acttion_names} 未全部成功,工作流失败")
self.workflowoper.fail(workflow_id, result=f"动作 {current_acttion_names} 执行失败")
return False
self.workflowoper.fail(workflow_id, result=f"动作 {current_acttion_names} 未全部成功")
return False, f"动作 {current_acttion_names} 未全部成功"
else:
logger.info(f"动作 {current_acttion_names} 执行完成,耗时:{(end_time - start_time).seconds}")
else:
@@ -132,12 +133,12 @@ class WorkflowChain(ChainBase):
if not state:
logger.error(f"动作 {action.name} 执行失败,工作流失败")
self.workflowoper.fail(workflow_id, result=f"动作 {action.name} 执行失败")
return False
return False, f"动作 {action.name} 执行失败"
logger.info(f"动作 {action.name} 执行完成,耗时:{(end_time - start_time).seconds}")
logger.info(f"工作流 {workflow.name} 执行完成")
self.workflowoper.success(workflow_id)
return True
return True, ""
def get_workflows(self) -> List[Workflow]:
"""