Files
Foxel/api/routes/share.py

206 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import List, Optional
from urllib.parse import quote
from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel
from api.response import success
from services.auth import User, get_current_active_user
from services.share import share_service
from services.virtual_fs import stream_file, stat_file
from models.database import ShareLink, UserAccount
public_router = APIRouter(prefix="/api/s", tags=["Share - Public"])
router = APIRouter(prefix="/api/shares", tags=["Share - Management"])
class ShareCreate(BaseModel):
name: str
paths: List[str]
expires_in_days: Optional[int] = 7
access_type: str = "public"
password: Optional[str] = None
class ShareInfo(BaseModel):
id: int
token: str
name: str
paths: List[str]
created_at: str
expires_at: Optional[str] = None
access_type: str
@classmethod
def from_orm(cls, obj: ShareLink):
return cls(
id=obj.id,
token=obj.token,
name=obj.name,
paths=obj.paths,
created_at=obj.created_at.isoformat(),
expires_at=obj.expires_at.isoformat() if obj.expires_at else None,
access_type=obj.access_type,
)
class ShareInfoWithPassword(ShareInfo):
password: Optional[str] = None
# --- Management Routes ---
@router.post("", response_model=ShareInfoWithPassword)
async def create_share(
payload: ShareCreate,
current_user: User = Depends(get_current_active_user),
):
"""
创建一个新的分享链接。
"""
user_account = await UserAccount.get(id=current_user.id)
share = await share_service.create_share_link(
user=user_account,
name=payload.name,
paths=payload.paths,
expires_in_days=payload.expires_in_days,
access_type=payload.access_type,
password=payload.password,
)
share_info_base = ShareInfo.from_orm(share)
response_data = share_info_base.model_dump()
if payload.access_type == "password" and payload.password:
response_data['password'] = payload.password
return response_data
@router.get("", response_model=List[ShareInfo])
async def get_my_shares(current_user: User = Depends(get_current_active_user)):
"""
获取当前用户的所有分享链接。
"""
user_account = await UserAccount.get(id=current_user.id)
shares = await share_service.get_user_shares(user=user_account)
return [ShareInfo.from_orm(s) for s in shares]
@router.delete("/{share_id}")
async def delete_share(
share_id: int,
current_user: User = Depends(get_current_active_user),
):
"""
删除一个分享链接。
"""
await share_service.delete_share_link(user=current_user, share_id=share_id)
return success(msg="分享已取消")
# --- Public Routes ---
class SharePassword(BaseModel):
password: str
@public_router.post("/{token}/verify")
async def verify_password(token: str, payload: SharePassword):
"""
验证分享链接的密码。
"""
share = await share_service.get_share_by_token(token)
if share.access_type != "password":
raise HTTPException(status_code=400, detail="此分享不需要密码")
if not share_service._verify_password(payload.password, share.hashed_password):
raise HTTPException(status_code=403, detail="密码错误")
# 在这里可以考虑返回一个有时效性的token用于后续访问但为了简单起见
# 我们让前端在每次请求时都带上密码或一个会话标识。
# 简单起见,我们只返回成功状态。
return success(msg="验证成功")
@public_router.get("/{token}/ls")
async def list_share_content(token: str, path: str = "/", password: Optional[str] = None):
"""
列出分享链接中的文件和目录。
"""
share = await share_service.get_share_by_token(token)
if share.access_type == "password":
if not password:
raise HTTPException(status_code=401, detail="需要密码")
if not share_service._verify_password(password, share.hashed_password):
raise HTTPException(status_code=403, detail="密码错误")
content = await share_service.get_shared_item_details(share, path)
return success({
"path": path,
"entries": content.get("items", []),
"pagination": {
"total": content.get("total", 0),
"page": content.get("page", 1),
"page_size": content.get("page_size", 1),
"pages": content.get("pages", 1),
}
})
@public_router.get("/{token}")
async def get_share_info(token: str):
"""
获取分享链接的元数据信息。
"""
share = await share_service.get_share_by_token(token)
return success(ShareInfo.from_orm(share))
@public_router.get("/{token}/download")
async def download_shared_file(token: str, path: str, request: Request, password: Optional[str] = None):
"""
下载分享链接中的单个文件。
"""
if not path or path == "/" or ".." in path.split('/'):
raise HTTPException(status_code=400, detail="无效的文件路径")
share = await share_service.get_share_by_token(token)
if share.access_type == "password":
if not password:
raise HTTPException(status_code=401, detail="需要密码")
if not share_service._verify_password(password, share.hashed_password):
raise HTTPException(status_code=403, detail="密码错误")
base_shared_path = share.paths[0]
# 判断分享的是文件还是目录
is_dir = False
try:
stat = await stat_file(base_shared_path)
if stat and stat.get("is_dir"):
is_dir = True
except HTTPException as e:
if "Path is a directory" in str(e.detail) or "Not a file" in str(e.detail):
is_dir = True
else:
# The shared path itself doesn't exist, which is an issue.
raise HTTPException(status_code=404, detail="分享的源文件不存在")
if is_dir:
# 目录分享:拼接路径
full_virtual_path = f"{base_shared_path.rstrip('/')}/{path.lstrip('/')}"
if not full_virtual_path.startswith(base_shared_path):
raise HTTPException(status_code=403, detail="无权访问此路径")
else:
# 文件分享:路径应为分享的根路径
shared_filename = base_shared_path.split('/')[-1]
request_filename = path.lstrip('/')
if shared_filename != request_filename:
raise HTTPException(status_code=403, detail="无权访问此路径")
full_virtual_path = base_shared_path
range_header = request.headers.get("Range")
response = await stream_file(full_virtual_path, range_header)
# 设置 Content-Disposition 头来强制下载
filename = full_virtual_path.split('/')[-1]
response.headers["Content-Disposition"] = f"attachment; filename*=UTF-8''{quote(filename)}"
return response