Files
Foxel/domain/role/service.py

289 lines
9.3 KiB
Python

from typing import List
from fastapi import HTTPException
from models.database import Role, RolePermission, PathRule, UserRole
from domain.permission.service import PermissionService
from domain.permission.types import PathRuleCreate, PathRuleInfo, PERMISSION_DEFINITIONS
from .types import RoleInfo, RoleDetail, RoleCreate, RoleUpdate, SystemRoles
class RoleService:
"""角色管理服务"""
@classmethod
async def get_all_roles(cls) -> List[RoleInfo]:
"""获取所有角色"""
roles = await Role.all().order_by("id")
return [
RoleInfo(
id=r.id,
name=r.name,
description=r.description,
is_system=r.is_system,
created_at=r.created_at,
)
for r in roles
]
@classmethod
async def get_role(cls, role_id: int) -> RoleDetail:
"""获取角色详情"""
role = await Role.get_or_none(id=role_id)
if not role:
raise HTTPException(404, detail="角色不存在")
# 获取权限
role_permissions = await RolePermission.filter(role_id=role_id)
permissions = sorted(set(rp.permission_code for rp in role_permissions))
# 获取路径规则数量
path_rules_count = await PathRule.filter(role_id=role_id).count()
return RoleDetail(
id=role.id,
name=role.name,
description=role.description,
is_system=role.is_system,
created_at=role.created_at,
permissions=permissions,
path_rules_count=path_rules_count,
)
@classmethod
async def create_role(cls, data: RoleCreate) -> RoleInfo:
"""创建角色"""
# 检查名称是否已存在
existing = await Role.get_or_none(name=data.name)
if existing:
raise HTTPException(400, detail="角色名称已存在")
role = await Role.create(
name=data.name,
description=data.description,
is_system=False,
)
return RoleInfo(
id=role.id,
name=role.name,
description=role.description,
is_system=role.is_system,
created_at=role.created_at,
)
@classmethod
async def update_role(cls, role_id: int, data: RoleUpdate) -> RoleInfo:
"""更新角色"""
role = await Role.get_or_none(id=role_id)
if not role:
raise HTTPException(404, detail="角色不存在")
if data.name is not None:
# 检查名称是否与其他角色冲突
existing = await Role.filter(name=data.name).exclude(id=role_id).first()
if existing:
raise HTTPException(400, detail="角色名称已存在")
role.name = data.name
if data.description is not None:
role.description = data.description
await role.save()
return RoleInfo(
id=role.id,
name=role.name,
description=role.description,
is_system=role.is_system,
created_at=role.created_at,
)
@classmethod
async def delete_role(cls, role_id: int) -> None:
"""删除角色"""
role = await Role.get_or_none(id=role_id)
if not role:
raise HTTPException(404, detail="角色不存在")
if role.is_system:
raise HTTPException(400, detail="系统内置角色不可删除")
# 检查是否有用户使用此角色
user_count = await UserRole.filter(role_id=role_id).count()
if user_count > 0:
raise HTTPException(400, detail=f"{user_count} 个用户正在使用此角色,无法删除")
await role.delete()
# 清除权限缓存
PermissionService.clear_cache()
@classmethod
async def set_role_permissions(cls, role_id: int, permission_codes: List[str]) -> List[str]:
"""设置角色的权限"""
role = await Role.get_or_none(id=role_id)
if not role:
raise HTTPException(404, detail="角色不存在")
all_permission_codes = {item["code"] for item in PERMISSION_DEFINITIONS}
invalid_codes = set(permission_codes) - all_permission_codes
if invalid_codes:
raise HTTPException(400, detail=f"无效的权限代码: {', '.join(invalid_codes)}")
# 删除现有权限
await RolePermission.filter(role_id=role_id).delete()
# 添加新权限
for code in permission_codes:
await RolePermission.create(
role_id=role_id,
permission_code=code,
)
# 清除权限缓存
PermissionService.clear_cache()
return list(permission_codes)
@classmethod
async def get_role_path_rules(cls, role_id: int) -> List[PathRuleInfo]:
"""获取角色的路径规则"""
role = await Role.get_or_none(id=role_id)
if not role:
raise HTTPException(404, detail="角色不存在")
rules = await PathRule.filter(role_id=role_id).order_by("-priority", "id")
return [
PathRuleInfo(
id=r.id,
role_id=r.role_id,
path_pattern=r.path_pattern,
is_regex=r.is_regex,
can_read=r.can_read,
can_write=r.can_write,
can_delete=r.can_delete,
can_share=r.can_share,
priority=r.priority,
created_at=r.created_at,
)
for r in rules
]
@classmethod
async def add_path_rule(cls, role_id: int, data: PathRuleCreate) -> PathRuleInfo:
"""添加路径规则"""
role = await Role.get_or_none(id=role_id)
if not role:
raise HTTPException(404, detail="角色不存在")
# 验证路径模式
if data.is_regex:
import re
try:
re.compile(data.path_pattern)
except re.error as e:
raise HTTPException(400, detail=f"无效的正则表达式: {e}")
rule = await PathRule.create(
role_id=role_id,
path_pattern=data.path_pattern,
is_regex=data.is_regex,
can_read=data.can_read,
can_write=data.can_write,
can_delete=data.can_delete,
can_share=data.can_share,
priority=data.priority,
)
# 清除权限缓存
PermissionService.clear_cache()
return PathRuleInfo(
id=rule.id,
role_id=rule.role_id,
path_pattern=rule.path_pattern,
is_regex=rule.is_regex,
can_read=rule.can_read,
can_write=rule.can_write,
can_delete=rule.can_delete,
can_share=rule.can_share,
priority=rule.priority,
created_at=rule.created_at,
)
@classmethod
async def update_path_rule(cls, rule_id: int, data: PathRuleCreate) -> PathRuleInfo:
"""更新路径规则"""
rule = await PathRule.get_or_none(id=rule_id)
if not rule:
raise HTTPException(404, detail="路径规则不存在")
# 验证路径模式
if data.is_regex:
import re
try:
re.compile(data.path_pattern)
except re.error as e:
raise HTTPException(400, detail=f"无效的正则表达式: {e}")
rule.path_pattern = data.path_pattern
rule.is_regex = data.is_regex
rule.can_read = data.can_read
rule.can_write = data.can_write
rule.can_delete = data.can_delete
rule.can_share = data.can_share
rule.priority = data.priority
await rule.save()
# 清除权限缓存
PermissionService.clear_cache()
return PathRuleInfo(
id=rule.id,
role_id=rule.role_id,
path_pattern=rule.path_pattern,
is_regex=rule.is_regex,
can_read=rule.can_read,
can_write=rule.can_write,
can_delete=rule.can_delete,
can_share=rule.can_share,
priority=rule.priority,
created_at=rule.created_at,
)
@classmethod
async def delete_path_rule(cls, rule_id: int) -> None:
"""删除路径规则"""
rule = await PathRule.get_or_none(id=rule_id)
if not rule:
raise HTTPException(404, detail="路径规则不存在")
await rule.delete()
# 清除权限缓存
PermissionService.clear_cache()
@classmethod
async def ensure_system_roles(cls) -> None:
"""确保系统内置角色存在"""
system_roles = [
{
"name": SystemRoles.ADMIN,
"description": "管理员角色,拥有所有系统和适配器权限",
"is_system": True,
},
{
"name": SystemRoles.USER,
"description": "普通用户角色,需要管理员配置路径权限",
"is_system": True,
},
{
"name": SystemRoles.VIEWER,
"description": "只读用户角色,仅可查看文件",
"is_system": True,
},
]
for role_data in system_roles:
existing = await Role.get_or_none(name=role_data["name"])
if not existing:
await Role.create(**role_data)