重构工作流相关API,支持异步操作并引入异步数据库管理

This commit is contained in:
jxxghp
2025-07-30 18:21:13 +08:00
parent 5f6310f5d6
commit cd8661abc1
4 changed files with 306 additions and 66 deletions

View File

@@ -3,18 +3,19 @@ from datetime import datetime
from typing import List, Any, Optional
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
from app import schemas
from app.chain.workflow import WorkflowChain
from app.core.config import global_vars
from app.core.plugin import PluginManager
from app.core.security import verify_token
from app.core.workflow import WorkFlowManager
from app.db import get_db
from app.db import get_async_db, get_db
from app.db.models import Workflow
from app.db.systemconfig_oper import SystemConfigOper
from app.db.user_oper import get_current_active_user
from app.db.workflow_oper import WorkflowOper
from app.db.workflow_oper import AsyncWorkflowOper, WorkflowOper
from app.helper.workflow import WorkflowHelper
from app.scheduler import Scheduler
from app.schemas.types import EventType, EVENT_TYPE_NAMES
@@ -23,23 +24,22 @@ router = APIRouter()
@router.get("/", summary="所有工作流", response_model=List[schemas.Workflow])
def list_workflows(db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
async def list_workflows(db: AsyncSession = Depends(get_async_db),
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
获取工作流列表
"""
from app.db.workflow_oper import WorkflowOper
return WorkflowOper(db).list()
return await AsyncWorkflowOper(db).list()
@router.post("/", summary="创建工作流", response_model=schemas.Response)
def create_workflow(workflow: schemas.Workflow,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
async def create_workflow(workflow: schemas.Workflow,
db: AsyncSession = Depends(get_async_db),
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
创建工作流
"""
if workflow.name and WorkflowOper(db).get_by_name(workflow.name):
if workflow.name and await AsyncWorkflowOper(db).get_by_name(workflow.name):
return schemas.Response(success=False, message="已存在相同名称的工作流")
if not workflow.add_time:
workflow.add_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
@@ -47,12 +47,13 @@ def create_workflow(workflow: schemas.Workflow,
workflow.state = "P"
if not workflow.trigger_type:
workflow.trigger_type = "timer"
Workflow(**workflow.dict()).create(db)
workflow_obj = Workflow(**workflow.dict())
await workflow_obj.async_create(db)
return schemas.Response(success=True, message="创建工作流成功")
@router.get("/plugin/actions", summary="查询插件动作", response_model=List[dict])
def list_plugin_actions(plugin_id: str = None, _: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
def list_plugin_actions(plugin_id: str = None, _: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
获取所有动作
"""
@@ -60,7 +61,7 @@ def list_plugin_actions(plugin_id: str = None, _: schemas.TokenPayload = Depends
@router.get("/actions", summary="所有动作", response_model=List[dict])
def list_actions(_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
async def list_actions(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
获取所有动作
"""
@@ -68,7 +69,7 @@ def list_actions(_: schemas.TokenPayload = Depends(get_current_active_user)) ->
@router.get("/event_types", summary="获取所有事件类型", response_model=List[dict])
def get_event_types(_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
async def get_event_types(_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
获取所有事件类型
"""
@@ -79,38 +80,38 @@ def get_event_types(_: schemas.TokenPayload = Depends(get_current_active_user))
@router.post("/share", summary="分享工作流", response_model=schemas.Response)
def workflow_share(
async def workflow_share(
workflow: schemas.WorkflowShare,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
分享工作流
"""
if not workflow.id or not workflow.share_title or not workflow.share_user:
return schemas.Response(success=False, message="请填写工作流ID、分享标题和分享人")
state, errmsg = WorkflowHelper().workflow_share(workflow_id=workflow.id,
share_title=workflow.share_title or "",
share_comment=workflow.share_comment or "",
share_user=workflow.share_user or "")
state, errmsg = await WorkflowHelper().async_workflow_share(workflow_id=workflow.id,
share_title=workflow.share_title or "",
share_comment=workflow.share_comment or "",
share_user=workflow.share_user or "")
return schemas.Response(success=state, message=errmsg)
@router.delete("/share/{share_id}", summary="删除分享", response_model=schemas.Response)
def workflow_share_delete(
async def workflow_share_delete(
share_id: int,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
删除分享
"""
state, errmsg = WorkflowHelper().share_delete(share_id=share_id)
state, errmsg = await WorkflowHelper().async_share_delete(share_id=share_id)
return schemas.Response(success=state, message=errmsg)
@router.post("/fork", summary="复用工作流", response_model=schemas.Response)
def workflow_fork(
async def workflow_fork(
workflow: schemas.WorkflowShare,
db: Session = Depends(get_db),
_: schemas.User = Depends(get_current_active_user)) -> Any:
db: AsyncSession = Depends(get_async_db),
_: schemas.User = Depends(verify_token)) -> Any:
"""
复用工作流
"""
@@ -148,36 +149,40 @@ def workflow_fork(
}
# 检查名称是否重复
if Workflow.get_by_name(db, workflow_dict["name"]):
workflow_oper = AsyncWorkflowOper(db)
if await workflow_oper.get_by_name(workflow_dict["name"]):
return schemas.Response(success=False, message="已存在相同名称的工作流")
# 创建新工作流
workflow = Workflow(**workflow_dict)
workflow.create(db)
workflow_obj = Workflow(**workflow_dict)
await workflow_obj.async_create(db)
# 获取工作流ID在数据库会话有效时
workflow = await workflow_oper.get_by_name(workflow_dict["name"])
# 更新复用次数
if workflow.id:
WorkflowHelper().workflow_fork(share_id=workflow.id)
if workflow:
await WorkflowHelper().async_workflow_fork(share_id=workflow.id)
return schemas.Response(success=True, message="复用成功")
@router.get("/shares", summary="查询分享的工作流", response_model=List[schemas.WorkflowShare])
def workflow_shares(
async def workflow_shares(
name: Optional[str] = None,
page: Optional[int] = 1,
count: Optional[int] = 30,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
查询分享的工作流
"""
return WorkflowHelper().get_shares(name=name, page=page, count=count)
return await WorkflowHelper().async_get_shares(name=name, page=page, count=count)
@router.post("/{workflow_id}/run", summary="执行工作流", response_model=schemas.Response)
def run_workflow(workflow_id: int,
from_begin: Optional[bool] = True,
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
执行工作流
"""
@@ -190,11 +195,10 @@ def run_workflow(workflow_id: int,
@router.post("/{workflow_id}/start", summary="启用工作流", response_model=schemas.Response)
def start_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
启用工作流
"""
from app.db.workflow_oper import WorkflowOper
workflow = WorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
@@ -212,7 +216,7 @@ def start_workflow(workflow_id: int,
@router.post("/{workflow_id}/pause", summary="停用工作流", response_model=schemas.Response)
def pause_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
停用工作流
"""
@@ -234,53 +238,52 @@ def pause_workflow(workflow_id: int,
@router.post("/{workflow_id}/reset", summary="重置工作流", response_model=schemas.Response)
def reset_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
async def reset_workflow(workflow_id: int,
db: AsyncSession = Depends(get_async_db),
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
重置工作流
"""
from app.db.workflow_oper import WorkflowOper
workflow = WorkflowOper(db).get(workflow_id)
workflow = await AsyncWorkflowOper(db).get(workflow_id)
if not workflow:
return schemas.Response(success=False, message="工作流不存在")
# 停止工作流
global_vars.stop_workflow(workflow_id)
# 重置工作流
workflow.reset(db, workflow_id, reset_count=True)
await Workflow.async_reset(db, workflow_id, reset_count=True)
# 删除缓存
SystemConfigOper().delete(f"WorkflowCache-{workflow_id}")
return schemas.Response(success=True)
@router.get("/{workflow_id}", summary="工作流详情", response_model=schemas.Workflow)
def get_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
async def get_workflow(workflow_id: int,
db: AsyncSession = Depends(get_async_db),
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
获取工作流详情
"""
from app.db.workflow_oper import WorkflowOper
return WorkflowOper(db).get(workflow_id)
return await AsyncWorkflowOper(db).get(workflow_id)
@router.put("/{workflow_id}", summary="更新工作流", response_model=schemas.Response)
def update_workflow(workflow: schemas.Workflow,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
更新工作流
"""
if not workflow.id:
return schemas.Response(success=False, message="工作流ID不能为空")
wf = WorkflowOper(db).get(workflow.id)
workflow_oper = WorkflowOper(db)
wf = workflow_oper.get(workflow.id)
if not wf:
return schemas.Response(success=False, message="工作流不存在")
if not wf.trigger_type:
workflow.trigger_type = "timer"
wf.update(db, workflow.dict())
# 更新后的工作流对象
updated_workflow = wf.get(workflow.id)
updated_workflow = workflow_oper.get(workflow.id)
# 更新定时任务
Scheduler().update_workflow_job(updated_workflow)
# 更新事件注册
@@ -291,7 +294,7 @@ def update_workflow(workflow: schemas.Workflow,
@router.delete("/{workflow_id}", summary="删除工作流", response_model=schemas.Response)
def delete_workflow(workflow_id: int,
db: Session = Depends(get_db),
_: schemas.TokenPayload = Depends(get_current_active_user)) -> Any:
_: schemas.TokenPayload = Depends(verify_token)) -> Any:
"""
删除工作流
"""

View File

@@ -1,9 +1,10 @@
from datetime import datetime
from typing import Optional
from sqlalchemy import Column, Integer, JSON, Sequence, String, and_, or_
from sqlalchemy import Column, Integer, JSON, Sequence, String, and_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Base, db_query, db_update
from app.db import Base, db_query, db_update, async_db_query, async_db_update
class Workflow(Base):
@@ -48,11 +49,23 @@ class Workflow(Base):
def list(db):
return db.query(Workflow).all()
@staticmethod
@async_db_query
async def async_list(db: AsyncSession):
result = await db.execute(select(Workflow))
return result.scalars().all()
@staticmethod
@db_query
def get_enabled_workflows(db):
return db.query(Workflow).filter(Workflow.state != 'P').all()
@staticmethod
@async_db_query
async def async_get_enabled_workflows(db: AsyncSession):
result = await db.execute(select(Workflow).where(Workflow.state != 'P'))
return result.scalars().all()
@staticmethod
@db_query
def get_timer_triggered_workflows(db):
@@ -67,6 +80,21 @@ class Workflow(Base):
)
).all()
@staticmethod
@async_db_query
async def async_get_timer_triggered_workflows(db: AsyncSession):
"""异步获取定时触发的工作流"""
result = await db.execute(select(Workflow).where(
and_(
or_(
Workflow.trigger_type == 'timer',
not Workflow.trigger_type
),
Workflow.state != 'P'
)
))
return result.scalars().all()
@staticmethod
@db_query
def get_event_triggered_workflows(db):
@@ -78,17 +106,42 @@ class Workflow(Base):
)
).all()
@staticmethod
@async_db_query
async def async_get_event_triggered_workflows(db: AsyncSession):
"""异步获取事件触发的工作流"""
result = await db.execute(select(Workflow).where(
and_(
Workflow.trigger_type == 'event',
Workflow.state != 'P'
)
))
return result.scalars().all()
@staticmethod
@db_query
def get_by_name(db, name: str):
return db.query(Workflow).filter(Workflow.name == name).first()
@staticmethod
@async_db_query
async def async_get_by_name(db: AsyncSession, name: str):
result = await db.execute(select(Workflow).where(Workflow.name == name))
return result.scalars().first()
@staticmethod
@db_update
def update_state(db, wid: int, state: str):
db.query(Workflow).filter(Workflow.id == wid).update({"state": state})
return True
@staticmethod
@async_db_update
async def async_update_state(db: AsyncSession, wid: int, state: str):
from sqlalchemy import update
await db.execute(update(Workflow).where(Workflow.id == wid).values(state=state))
return True
@staticmethod
@db_update
def start(db, wid: int):
@@ -97,6 +150,13 @@ class Workflow(Base):
})
return True
@staticmethod
@async_db_update
async def async_start(db: AsyncSession, wid: int):
from sqlalchemy import update
await db.execute(update(Workflow).where(Workflow.id == wid).values(state='R'))
return True
@staticmethod
@db_update
def fail(db, wid: int, result: str):
@@ -107,6 +167,19 @@ class Workflow(Base):
})
return True
@staticmethod
@async_db_update
async def async_fail(db: AsyncSession, wid: int, result: str):
from sqlalchemy import update
await db.execute(update(Workflow).where(
and_(Workflow.id == wid, Workflow.state != "P")
).values(
state='F',
result=result,
last_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S')
))
return True
@staticmethod
@db_update
def success(db, wid: int, result: Optional[str] = None):
@@ -118,6 +191,20 @@ class Workflow(Base):
})
return True
@staticmethod
@async_db_update
async def async_success(db: AsyncSession, wid: int, result: Optional[str] = None):
from sqlalchemy import update
await db.execute(update(Workflow).where(
and_(Workflow.id == wid, Workflow.state != "P")
).values(
state='S',
result=result,
run_count=Workflow.run_count + 1,
last_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S')
))
return True
@staticmethod
@db_update
def reset(db, wid: int, reset_count: Optional[bool] = False):
@@ -129,6 +216,18 @@ class Workflow(Base):
})
return True
@staticmethod
@async_db_update
async def async_reset(db: AsyncSession, wid: int, reset_count: Optional[bool] = False):
from sqlalchemy import update
await db.execute(update(Workflow).where(Workflow.id == wid).values(
state='W',
result=None,
current_action=None,
run_count=0 if reset_count else Workflow.run_count,
))
return True
@staticmethod
@db_update
def update_current_action(db, wid: int, action_id: str, context: dict):
@@ -137,3 +236,18 @@ class Workflow(Base):
"context": context
})
return True
@staticmethod
@async_db_update
async def async_update_current_action(db: AsyncSession, wid: int, action_id: str, context: dict):
from sqlalchemy import update
# 先获取当前current_action
result = await db.execute(select(Workflow.current_action).where(Workflow.id == wid))
current_action = result.scalar()
new_current_action = current_action + f",{action_id}" if current_action else action_id
await db.execute(update(Workflow).where(Workflow.id == wid).values(
current_action=new_current_action,
context=context
))
return True

View File

@@ -1,6 +1,6 @@
from typing import List, Tuple, Optional
from typing import List, Tuple, Optional, Any, Coroutine, Sequence
from app.db import DbOper
from app.db import DbOper, AsyncDbOper
from app.db.models.workflow import Workflow
@@ -84,3 +84,27 @@ class WorkflowOper(DbOper):
重置
"""
return Workflow.reset(self._db, wid, reset_count=reset_count)
class AsyncWorkflowOper(AsyncDbOper):
"""
异步工作流管理
"""
async def get(self, wid: int) -> Workflow:
"""
异步查询单个工作流
"""
return await Workflow.async_get(self._db, wid)
async def list(self) -> Coroutine[Any, Any, Sequence[Any]]:
"""
异步获取所有工作流列表
"""
return await Workflow.async_list(self._db)
async def get_by_name(self, name: str) -> Workflow:
"""
异步按名称获取工作流
"""
return await Workflow.async_get_by_name(self._db, name)

View File

@@ -3,9 +3,9 @@ from typing import List, Tuple, Optional
from app.core.cache import cached, cache_backend
from app.core.config import settings
from app.db.workflow_oper import WorkflowOper
from app.db.workflow_oper import WorkflowOper, AsyncWorkflowOper
from app.log import logger
from app.utils.http import RequestUtils
from app.utils.http import RequestUtils, AsyncRequestUtils
from app.utils.singleton import WeakSingleton
from app.utils.system import SystemUtils
@@ -35,7 +35,7 @@ class WorkflowHelper(metaclass=WeakSingleton):
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return False, "当前没有开启工作流数据共享功能"
# 获取工作流信息
workflow = WorkflowOper().get(workflow_id)
if not workflow:
@@ -51,7 +51,8 @@ class WorkflowHelper(metaclass=WeakSingleton):
workflow_dict['flows'] = json.dumps(workflow_dict['flows'] or [])
# 发送分享请求
res = RequestUtils(proxies=settings.PROXY or {}, content_type="application/json",
res = RequestUtils(proxies=settings.PROXY or {},
content_type="application/json",
timeout=10).post(self._workflow_share,
json={
"share_title": share_title,
@@ -69,13 +70,55 @@ class WorkflowHelper(metaclass=WeakSingleton):
else:
return False, res.json().get("message")
async def async_workflow_share(self, workflow_id: int,
share_title: str, share_comment: str, share_user: str) -> Tuple[bool, str]:
"""
异步分享工作流
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return False, "当前没有开启工作流数据共享功能"
# 获取工作流信息
workflow = await AsyncWorkflowOper().get(workflow_id)
if not workflow:
return False, "工作流不存在"
if not workflow.actions or not workflow.flows:
return False, "请分享有动作和流程的工作流"
workflow_dict = workflow.to_dict()
workflow_dict.pop("id", None)
workflow_dict.pop("context", None)
workflow_dict['actions'] = json.dumps(workflow_dict['actions'] or [])
workflow_dict['flows'] = json.dumps(workflow_dict['flows'] or [])
# 发送分享请求
res = await AsyncRequestUtils(proxies=settings.PROXY or {},
content_type="application/json",
timeout=10).post(self._workflow_share,
json={
"share_title": share_title,
"share_comment": share_comment,
"share_user": share_user,
"share_uid": self._share_user_id,
**workflow_dict
})
if res is None:
return False, "连接MoviePilot服务器失败"
if res.status_code == 200:
# 清除 get_shares 的缓存,以便实时看到结果
cache_backend.clear(region=self._shares_cache_region)
return True, ""
else:
return False, res.json().get("message")
def share_delete(self, share_id: int) -> Tuple[bool, str]:
"""
删除分享
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return False, "当前没有开启工作流数据共享功能"
res = RequestUtils(proxies=settings.PROXY or {},
timeout=5).delete_res(f"{self._workflow_share}/{share_id}",
params={"share_uid": self._share_user_id})
@@ -88,13 +131,32 @@ class WorkflowHelper(metaclass=WeakSingleton):
else:
return False, res.json().get("message")
async def async_share_delete(self, share_id: int) -> Tuple[bool, str]:
"""
异步删除分享
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return False, "当前没有开启工作流数据共享功能"
res = await AsyncRequestUtils(proxies=settings.PROXY or {},
timeout=5).delete_res(f"{self._workflow_share}/{share_id}",
params={"share_uid": self._share_user_id})
if res is None:
return False, "连接MoviePilot服务器失败"
if res.status_code == 200:
# 清除 get_shares 的缓存,以便实时看到结果
cache_backend.clear(region=self._shares_cache_region)
return True, ""
else:
return False, res.json().get("message")
def workflow_fork(self, share_id: int) -> Tuple[bool, str]:
"""
复用分享的工作流
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return False, "当前没有开启工作流数据共享功能"
res = RequestUtils(proxies=settings.PROXY or {}, timeout=5, headers={
"Content-Type": "application/json"
}).get_res(self._workflow_fork % share_id)
@@ -105,6 +167,25 @@ class WorkflowHelper(metaclass=WeakSingleton):
else:
return False, res.json().get("message")
async def async_workflow_fork(self, share_id: int) -> Tuple[bool, str]:
"""
异步复用分享的工作流
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return False, "当前没有开启工作流数据共享功能"
res = await AsyncRequestUtils(proxies=settings.PROXY or {},
timeout=5,
headers={
"Content-Type": "application/json"
}).get_res(self._workflow_fork % share_id)
if res is None:
return False, "连接MoviePilot服务器失败"
if res.status_code == 200:
return True, ""
else:
return False, res.json().get("message")
@cached(region=_shares_cache_region, maxsize=1, skip_empty=True)
def get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> List[dict]:
"""
@@ -112,7 +193,7 @@ class WorkflowHelper(metaclass=WeakSingleton):
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return []
res = RequestUtils(proxies=settings.PROXY or {}, timeout=15).get_res(self._workflow_shares, params={
"name": name,
"page": page,
@@ -122,6 +203,24 @@ class WorkflowHelper(metaclass=WeakSingleton):
return res.json()
return []
@cached(region=_shares_cache_region, maxsize=1, skip_empty=True)
async def async_get_shares(self, name: Optional[str] = None, page: Optional[int] = 1, count: Optional[int] = 30) -> \
List[dict]:
"""
异步获取工作流分享数据
"""
if not settings.WORKFLOW_STATISTIC_SHARE: # 使用独立的工作流分享开关
return []
res = await AsyncRequestUtils(proxies=settings.PROXY or {}, timeout=15).get_res(self._workflow_shares, params={
"name": name,
"page": page,
"count": count
})
if res and res.status_code == 200:
return res.json()
return []
def get_user_uuid(self) -> str:
"""
获取用户uuid
@@ -129,4 +228,4 @@ class WorkflowHelper(metaclass=WeakSingleton):
if not self._share_user_id:
self._share_user_id = SystemUtils.generate_user_unique_id()
logger.info(f"当前用户UUID: {self._share_user_id}")
return self._share_user_id or ""
return self._share_user_id or ""