add storage

This commit is contained in:
jxxghp
2024-06-30 08:59:12 +08:00
parent 77632880d1
commit 63ca5ee313
24 changed files with 367 additions and 194 deletions

View File

@@ -0,0 +1,75 @@
from abc import ABCMeta, abstractmethod
from pathlib import Path
from typing import Optional, List, Any
from app import schemas
class StorageBase(metaclass=ABCMeta):
"""
存储基类
"""
@abstractmethod
def check(self) -> bool:
"""
检查存储是否可用
"""
pass
@abstractmethod
def list(self, fileitm: schemas.FileItem) -> Optional[List[schemas.FileItem]]:
"""
浏览文件
"""
pass
@abstractmethod
def create_folder(self, fileitm: schemas.FileItem, name: str) -> Optional[schemas.FileItem]:
"""
创建目录
"""
pass
@abstractmethod
def delete(self, fileitm: schemas.FileItem) -> bool:
"""
删除文件
"""
pass
@abstractmethod
def rename(self, fileitm: schemas.FileItem, name: str) -> bool:
"""
重命名文件
"""
pass
@abstractmethod
def download(self, fileitm: schemas.FileItem) -> Any:
"""
下载链接
"""
pass
@abstractmethod
def move(self, fileitm: schemas.FileItem, target_dir: schemas.FileItem) -> bool:
"""
移动文件
"""
pass
@abstractmethod
def upload(self, fileitm: schemas.FileItem, path: Path) -> Optional[schemas.FileItem]:
"""
上传文件
"""
pass
@abstractmethod
def detail(self, fileitm: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
获取文件详情
"""
pass

View File

@@ -0,0 +1,627 @@
import base64
import json
import time
import uuid
from pathlib import Path
from typing import Optional, Tuple, List
from requests import Response
from app import schemas
from app.core.config import settings
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.modules.filetransfer.storage import StorageBase
from app.schemas.types import SystemConfigKey
from app.utils.http import RequestUtils
from app.utils.string import StringUtils
from app.utils.system import SystemUtils
class AliPan(StorageBase):
"""
阿里云相关操作
"""
_X_SIGNATURE = ('f4b7bed5d8524a04051bd2da876dd79afe922b8205226d65855d02b267422adb1'
'e0d8a816b021eaf5c36d101892180f79df655c5712b348c2a540ca136e6b22001')
_X_PUBLIC_KEY = ('04d9d2319e0480c840efeeb75751b86d0db0c5b9e72c6260a1d846958adceaf9d'
'ee789cab7472741d23aafc1a9c591f72e7ee77578656e6c8588098dea1488ac2a')
# 生成二维码
qrcode_url = ("https://passport.aliyundrive.com/newlogin/qrcode/generate.do?"
"appName=aliyun_drive&fromSite=52&appEntrance=web&isMobile=false"
"&lang=zh_CN&returnUrl=&bizParams=&_bx-v=2.0.31")
# 二维码登录确认
check_url = "https://passport.aliyundrive.com/newlogin/qrcode/query.do?appName=aliyun_drive&fromSite=52&_bx-v=2.0.31"
# 更新访问令牌
update_accessstoken_url = "https://auth.aliyundrive.com/v2/account/token"
# 创建会话
create_session_url = "https://api.aliyundrive.com/users/v1/users/device/create_session"
# 用户信息
user_info_url = "https://user.aliyundrive.com/v2/user/get"
# 浏览文件
list_file_url = "https://api.aliyundrive.com/adrive/v3/file/list"
# 创建目录或文件
create_folder_file_url = "https://api.aliyundrive.com/adrive/v2/file/createWithFolders"
# 文件详情
file_detail_url = "https://api.aliyundrive.com/v2/file/get"
# 删除文件
delete_file_url = " https://api.aliyundrive.com/v2/recyclebin/trash"
# 文件重命名
rename_file_url = "https://api.aliyundrive.com/v3/file/update"
# 获取下载链接
download_url = "https://api.aliyundrive.com/v2/file/get_download_url"
# 移动文件
move_file_url = "https://api.aliyundrive.com/v2/file/move"
# 上传文件完成
upload_file_complete_url = "https://api.aliyundrive.com/v2/file/complete"
def __init__(self):
self.systemconfig = SystemConfigOper()
def __handle_error(self, res: Response, apiname: str, action: bool = True):
"""
统一处理和打印错误信息
"""
if res is None:
logger.warn("无法连接到阿里云盘!")
return
try:
result = res.json()
except Exception as err:
logger.error(f"解析阿里云盘返回数据失败:{str(err)}")
return
code = result.get("code")
message = result.get("message")
display_message = result.get("display_message")
if code or message:
logger.warn(f"Aliyun {apiname}失败:{code} - {display_message or message}")
if action:
if code == "DeviceSessionSignatureInvalid":
logger.warn("设备已失效,正在重新建立会话...")
self.__create_session(self.__get_headers(self.__auth_params))
if code == "UserDeviceOffline":
logger.warn("设备已离线,尝试重新登录,如仍报错请检查阿里云盘绑定设备数量是否超限!")
self.__create_session(self.__get_headers(self.__auth_params))
if code == "AccessTokenInvalid":
logger.warn("访问令牌已失效,正在刷新令牌...")
self.__update_accesstoken(self.__auth_params, self.__auth_params.get("refreshToken"))
else:
logger.info(f"Aliyun {apiname}成功")
@property
def __auth_params(self):
"""
获取阿里云盘认证参数并初始化参数格式
"""
return self.systemconfig.get(SystemConfigKey.UserAliyunParams) or {}
def __update_params(self, params: dict):
"""
设置阿里云盘认证参数
"""
current_params = self.__auth_params
current_params.update(params)
self.systemconfig.set(SystemConfigKey.UserAliyunParams, current_params)
def __clear_params(self):
"""
清除阿里云盘认证参数
"""
self.systemconfig.delete(SystemConfigKey.UserAliyunParams)
def generate_qrcode(self) -> Optional[Tuple[dict, str]]:
"""
生成二维码
"""
res = RequestUtils(timeout=10).get_res(self.qrcode_url)
if res:
data = res.json().get("content", {}).get("data")
return {
"codeContent": data.get("codeContent"),
"ck": data.get("ck"),
"t": data.get("t")
}, ""
elif res is not None:
self.__handle_error(res, "生成二维码")
return {}, f"请求阿里云盘二维码失败:{res.status_code} - {res.reason}"
return {}, f"请求阿里云盘二维码失败:无法连接!"
def check_login(self, ck: str, t: str) -> Optional[Tuple[dict, str]]:
"""
二维码登录确认
"""
params = {
"t": t,
"ck": ck,
"appName": "aliyun_drive",
"appEntrance": "web",
"isMobile": "false",
"lang": "zh_CN",
"returnUrl": "",
"fromSite": "52",
"bizParams": "",
"navlanguage": "zh-CN",
"navPlatform": "MacIntel",
}
body = "&".join([f"{key}={value}" for key, value in params.items()])
status = {
"NEW": "请用阿里云盘 App 扫码",
"SCANED": "请在手机上确认",
"EXPIRED": "二维码已过期",
"CANCELED": "已取消",
"CONFIRMED": "已确认",
}
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
}
res = RequestUtils(headers=headers, timeout=5).post_res(self.check_url, data=body)
if res:
data = res.json().get("content", {}).get("data") or {}
qrCodeStatus = data.get("qrCodeStatus")
data["tip"] = status.get(qrCodeStatus) or "未知"
if data.get("bizExt"):
try:
bizExt = json.loads(base64.b64decode(data["bizExt"]).decode('GBK'))
pds_login_result = bizExt.get("pds_login_result")
if pds_login_result:
data.pop('bizExt')
data.update({
'userId': pds_login_result.get('userId'),
'expiresIn': pds_login_result.get('expiresIn'),
'nickName': pds_login_result.get('nickName'),
'avatar': pds_login_result.get('avatar'),
'tokenType': pds_login_result.get('tokenType'),
"refreshToken": pds_login_result.get('refreshToken'),
"accessToken": pds_login_result.get('accessToken'),
"defaultDriveId": pds_login_result.get('defaultDriveId'),
"updateTime": time.time(),
})
self.__update_params(data)
self.user_info()
except Exception as e:
return {}, f"bizExt 解码失败:{str(e)}"
return data, ""
elif res is not None:
self.__handle_error(res, "登录确认")
return {}, f"阿里云盘登录确认失败:{res.status_code} - {res.reason}"
return {}, "阿里云盘登录确认失败:无法连接!"
def __update_accesstoken(self, params: dict, refresh_token: str) -> bool:
"""
更新阿里云盘访问令牌
"""
headers = self.__get_headers(params)
res = RequestUtils(headers=headers, timeout=10).post_res(
self.update_accessstoken_url, json={
"refresh_token": refresh_token,
"grant_type": "refresh_token"
})
if res:
data = res.json()
code = data.get("code")
if code in ["RefreshTokenExpired", "InvalidParameter.RefreshToken"]:
logger.warn("刷新令牌已过期,请重新登录!")
self.__clear_params()
return False
self.__update_params({
"accessToken": data.get('access_token'),
"expiresIn": data.get('expires_in'),
"updateTime": time.time()
})
logger.info(f"阿里云盘访问令牌已更新accessToken={data.get('access_token')}")
return True
else:
self.__handle_error(res, "更新令牌", action=False)
return False
def __create_session(self, headers: dict):
"""
创建会话
"""
def __os_name():
"""
获取操作系统名称
"""
if SystemUtils.is_windows():
return 'Windows 操作系统'
elif SystemUtils.is_macos():
return 'MacOS 操作系统'
else:
return '类 Unix 操作系统'
res = RequestUtils(headers=headers, timeout=5).post_res(self.create_session_url, json={
'deviceName': f'MoviePilot {SystemUtils.platform}',
'modelName': __os_name(),
'pubKey': self._X_PUBLIC_KEY,
})
self.__handle_error(res, "创建会话", action=False)
@property
def __access_params(self) -> Optional[dict]:
"""
获取阿里云盘访问参数,如果超时则更新后返回
"""
params = self.__auth_params
if not params:
logger.warn("阿里云盘访问令牌不存在,请先扫码登录!")
return None
expires_in = params.get("expiresIn")
update_time = params.get("updateTime")
refresh_token = params.get("refreshToken")
if not expires_in or not update_time or not refresh_token:
logger.warn("阿里云盘访问令牌参数错误,请重新扫码登录!")
self.__clear_params()
return None
# 是否需要更新设备信息
update_device = False
# 判断访问令牌是否过期
if (time.time() - update_time) >= expires_in:
logger.info("阿里云盘访问令牌已过期,正在更新...")
if not self.__update_accesstoken(params, refresh_token):
# 更新失败
return None
update_device = True
# 生成设备ID
x_device_id = params.get("x_device_id")
if not x_device_id:
x_device_id = uuid.uuid4().hex
params['x_device_id'] = x_device_id
self.__update_params({"x_device_id": x_device_id})
update_device = True
# 更新设备信息重新创建会话
if update_device:
self.__create_session(self.__get_headers(params))
return params
def __get_headers(self, params: dict):
"""
获取请求头
"""
if not params:
return {}
return {
"Authorization": f"Bearer {params.get('accessToken')}",
"Content-Type": "application/json;charset=UTF-8",
"Accept": "application/json, text/plain, */*",
"Referer": "https://www.alipan.com/",
"User-Agent": settings.USER_AGENT,
"X-Canary": "client=web,app=adrive,version=v4.9.0",
"x-device-id": params.get('x_device_id'),
"x-signature": self._X_SIGNATURE
}
def check(self) -> bool:
"""
检查存储是否可用
"""
pass
def user_info(self) -> dict:
"""
获取用户信息drive_id等
"""
params = self.__access_params
if not params:
return {}
headers = self.__get_headers(params)
res = RequestUtils(headers=headers, timeout=10).post_res(self.user_info_url)
if res:
result = res.json()
self.__update_params({
"resourceDriveId": result.get("resource_drive_id"),
"backDriveId": result.get("backup_drive_id")
})
return result
else:
self.__handle_error(res, "获取用户信息")
return {}
def list(self, drive_id: str = None, parent_file_id: str = 'root', list_type: str = None,
limit: int = 100, order_by: str = 'updated_at', path: str = "/") -> List[schemas.FileItem]:
"""
浏览文件
limit 返回文件数量,默认 50最大 100
order_by created_at/updated_at/name/size
parent_file_id 根目录为root
type all | file | folder
"""
params = self.__access_params
if not params:
return []
# 请求头
headers = self.__get_headers(params)
# 根目录处理
if not drive_id:
return [
schemas.FileItem(
fileid=parent_file_id,
drive_id=params.get("resourceDriveId"),
parent_fileid="root",
type="dir",
path="/资源库/",
name="资源库"
),
schemas.FileItem(
fileid=parent_file_id,
drive_id=params.get("backDriveId"),
parent_fileid="root",
type="dir",
path="/备份盘/",
name="备份盘"
)
]
# 返回数据
ret_items = []
# 分页获取
next_marker = None
while True:
if not parent_file_id or parent_file_id == "/":
parent_file_id = "root"
res = RequestUtils(headers=headers, timeout=10).post_res(self.list_file_url, json={
"drive_id": drive_id,
"type": list_type,
"limit": limit,
"order_by": order_by,
"parent_file_id": parent_file_id,
"marker": next_marker
}, params={
'jsonmask': ('next_marker,items(name,file_id,drive_id,type,size,created_at,updated_at,'
'category,file_extension,parent_file_id,mime_type,starred,thumbnail,url,'
'streams_info,content_hash,user_tags,user_meta,trashed,video_media_metadata,'
'video_preview_metadata,sync_meta,sync_device_flag,sync_flag,punish_flag')
})
if res:
result = res.json()
items = result.get("items")
if not items:
break
# 合并数据
ret_items.extend(items)
next_marker = result.get("next_marker")
if not next_marker:
# 没有下一页
break
else:
self.__handle_error(res, "浏览文件")
break
return [schemas.FileItem(
fileid=fileinfo.get("file_id"),
parent_fileid=fileinfo.get("parent_file_id"),
type="dir" if fileinfo.get("type") == "folder" else "file",
path=f"{path}{fileinfo.get('name')}" + ("/" if fileinfo.get("type") == "folder" else ""),
name=fileinfo.get("name"),
size=fileinfo.get("size"),
extension=fileinfo.get("file_extension"),
modify_time=StringUtils.str_to_timestamp(fileinfo.get("updated_at")),
thumbnail=fileinfo.get("thumbnail"),
drive_id=fileinfo.get("drive_id"),
) for fileinfo in ret_items]
def create_folder(self, fileitem: schemas.FileItem, name: str) -> Optional[schemas.FileItem]:
"""
创建目录
"""
params = self.__access_params
if not params:
return None
headers = self.__get_headers(params)
res = RequestUtils(headers=headers, timeout=10).post_res(self.create_folder_file_url, json={
"drive_id": fileitem.drive_id,
"parent_file_id": fileitem.parent_fileid,
"name": name,
"check_name_mode": "refuse",
"type": "folder"
})
if res:
"""
{
"parent_file_id": "root",
"type": "folder",
"file_id": "6673f2c8a88344741bd64ad192d7512b92087719",
"domain_id": "bj29",
"drive_id": "39146740",
"file_name": "test",
"encrypt_mode": "none"
}
"""
result = res.json()
return schemas.FileItem(
fileid=result.get("file_id"),
drive_id=result.get("drive_id"),
parent_fileid=result.get("parent_file_id"),
type=result.get("type"),
name=result.get("file_name"),
path=f"{fileitem.path}{result.get('file_name')}",
)
else:
self.__handle_error(res, "创建目录")
return None
def delete(self, fileitem: schemas.FileItem) -> bool:
"""
删除文件
"""
params = self.__access_params
if not params:
return False
headers = self.__get_headers(params)
res = RequestUtils(headers=headers, timeout=10).post_res(self.delete_file_url, json={
"drive_id": fileitem.drive_id,
"file_id": fileitem.fileid
})
if res:
return True
else:
self.__handle_error(res, "删除文件")
return False
def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
获取文件详情
"""
params = self.__access_params
if not params:
return None
headers = self.__get_headers(params)
res = RequestUtils(headers=headers, timeout=10).post_res(self.file_detail_url, json={
"drive_id": fileitem.drive_id,
"file_id": fileitem.fileid
})
if res:
result = res.json()
return schemas.FileItem(
fileid=result.get("file_id"),
drive_id=result.get("drive_id"),
parent_fileid=result.get("parent_file_id"),
type="file",
name=result.get("name"),
size=result.get("size"),
extension=result.get("file_extension"),
modify_time=StringUtils.str_to_timestamp(result.get("updated_at")),
thumbnail=result.get("thumbnail"),
path=f"{fileitem.path}{result.get('name')}"
)
else:
self.__handle_error(res, "获取文件详情")
return None
def rename(self, fileitem: schemas.FileItem, name: str) -> bool:
"""
重命名文件
"""
params = self.__access_params
if not params:
return False
headers = self.__get_headers(params)
res = RequestUtils(headers=headers, timeout=10).post_res(self.rename_file_url, json={
"drive_id": fileitem.drive_id,
"file_id": fileitem.fileid,
"name": name,
"check_name_mode": "refuse"
})
if res:
return True
else:
self.__handle_error(res, "重命名文件")
return False
def download(self, fileitem: schemas.FileItem) -> Optional[str]:
"""
获取下载链接
"""
params = self.__access_params
if not params:
return None
headers = self.__get_headers(params)
res = RequestUtils(headers=headers, timeout=10).post_res(self.download_url, json={
"drive_id": fileitem.drive_id,
"file_id": fileitem.fileid
})
if res:
return res.json().get("url")
else:
self.__handle_error(res, "获取下载链接")
return None
def move(self, fileitem: schemas.FileItem, target_dir: schemas.FileItem) -> bool:
"""
移动文件
"""
params = self.__access_params
if not params:
return False
headers = self.__get_headers(params)
res = RequestUtils(headers=headers, timeout=10).post_res(self.move_file_url, json={
"drive_id": fileitem.drive_id,
"file_id": fileitem.fileid,
"to_parent_file_id": target_dir.fileid,
"check_name_mode": "refuse"
})
if res:
return True
else:
self.__handle_error(res, "移动文件")
return False
def upload(self, fileitem: schemas.FileItem, path: Path) -> Optional[schemas.FileItem]:
"""
上传文件,并标记完成
"""
params = self.__access_params
if not params:
return None
headers = self.__get_headers(params)
res = RequestUtils(headers=headers, timeout=10).post_res(self.create_folder_file_url, json={
"drive_id": fileitem.drive_id,
"parent_file_id": fileitem.parent_fileid,
"name": path.name,
"check_name_mode": "refuse",
"create_scene": "file_upload",
"type": "file",
"part_info_list": [
{
"part_number": 1
}
],
"size": path.stat().st_size
})
if not res:
self.__handle_error(res, "创建文件")
return None
# 获取上传参数
result = res.json()
if result.get("exist"):
logger.info(f"文件{result.get('file_name')}已存在,无需上传")
return schemas.FileItem(
drive_id=result.get("drive_id"),
fileid=result.get("file_id"),
parent_fileid=result.get("parent_file_id"),
type="file",
name=result.get("file_name"),
path=f"{fileitem.path}{result.get('file_name')}"
)
file_id = result.get("file_id")
upload_id = result.get("upload_id")
part_info_list = result.get("part_info_list")
if part_info_list:
# 上传地址
upload_url = part_info_list[0].get("upload_url")
# 上传文件
res = RequestUtils(headers={
"Content-Type": "",
"User-Agent": settings.USER_AGENT,
"Referer": "https://www.alipan.com/",
"Accept": "*/*",
}).put_res(upload_url, data=path.read_bytes())
if not res:
self.__handle_error(res, "上传文件")
return None
# 标记文件上传完毕
res = RequestUtils(headers=headers, timeout=10).post_res(self.upload_file_complete_url, json={
"drive_id": fileitem.drive_id,
"file_id": file_id,
"upload_id": upload_id
})
if not res:
self.__handle_error(res, "标记上传状态")
return None
result = res.json()
return schemas.FileItem(
fileid=result.get("file_id"),
drive_id=result.get("drive_id"),
parent_fileid=result.get("parent_file_id"),
type="file",
name=result.get("name"),
path=f"{fileitem.path}{result.get('name')}",
)
else:
logger.warn("上传文件失败:无法获取上传地址!")
return None

View File

@@ -0,0 +1,200 @@
import shutil
from pathlib import Path
from typing import Optional, List
from starlette.responses import FileResponse, Response
from app import schemas
from app.log import logger
from app.modules.filetransfer.storage import StorageBase
from app.utils.system import SystemUtils
class LocalStorage(StorageBase):
"""
本地文件操作
"""
def check(self) -> bool:
"""
检查存储是否可用
"""
return True
def list(self, fileitem: schemas.FileItem) -> Optional[List[schemas.FileItem]]:
"""
浏览文件
"""
# 返回结果
ret_items = []
path = fileitem.path
if not fileitem.path or fileitem.path == "/":
if SystemUtils.is_windows():
partitions = SystemUtils.get_windows_drives() or ["C:/"]
for partition in partitions:
ret_items.append(schemas.FileItem(
type="dir",
path=partition + "/",
name=partition,
basename=partition
))
return ret_items
else:
path = "/"
else:
if SystemUtils.is_windows():
path = path.lstrip("/")
elif not path.startswith("/"):
path = "/" + path
# 遍历目录
path_obj = Path(path)
if not path_obj.exists():
logger.warn(f"目录不存在:{path}")
return []
# 如果是文件
if path_obj.is_file():
ret_items.append(schemas.FileItem(
type="file",
path=str(path_obj).replace("\\", "/"),
name=path_obj.name,
basename=path_obj.stem,
extension=path_obj.suffix[1:],
size=path_obj.stat().st_size,
modify_time=path_obj.stat().st_mtime,
))
return ret_items
# 扁历所有目录
for item in SystemUtils.list_sub_directory(path_obj):
ret_items.append(schemas.FileItem(
type="dir",
path=str(item).replace("\\", "/") + "/",
name=item.name,
basename=item.stem,
modify_time=item.stat().st_mtime,
))
# 遍历所有文件,不含子目录
for item in SystemUtils.list_sub_all(path_obj):
ret_items.append(schemas.FileItem(
type="file",
path=str(item).replace("\\", "/"),
name=item.name,
basename=item.stem,
extension=item.suffix[1:],
size=item.stat().st_size,
modify_time=item.stat().st_mtime,
))
return ret_items
def create_folder(self, fileitem: schemas.FileItem, name: str) -> Optional[schemas.FileItem]:
"""
创建目录
"""
if not fileitem.path:
return None
path_obj = Path(fileitem.path) / name
if path_obj.exists():
return None
path_obj.mkdir(parents=True, exist_ok=True)
return schemas.FileItem(
type="dir",
path=str(path_obj).replace("\\", "/") + "/",
name=name,
basename=name,
modify_time=path_obj.stat().st_mtime,
)
def detail(self, fileitm: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
获取文件详情
"""
path_obj = Path(fileitm.path)
return schemas.FileItem(
type="file",
path=str(path_obj).replace("\\", "/"),
name=path_obj.name,
basename=path_obj.stem,
extension=path_obj.suffix[1:],
size=path_obj.stat().st_size,
modify_time=path_obj.stat().st_mtime,
)
def delete(self, fileitem: schemas.FileItem) -> bool:
"""
删除文件
"""
if not fileitem.path:
return False
path_obj = Path(fileitem.path)
if not path_obj.exists():
return False
if path_obj.is_file():
path_obj.unlink()
else:
shutil.rmtree(path_obj, ignore_errors=True)
return True
def rename(self, fileitem: schemas.FileItem, name: str) -> bool:
"""
重命名文件
"""
path_obj = Path(fileitem.path)
if not path_obj.exists():
return False
path_obj.rename(path_obj.parent / name)
def download(self, fileitem: schemas.FileItem) -> Optional[Response]:
"""
下载文件
"""
if not fileitem.path:
return None
path_obj = Path(fileitem.path)
if not path_obj.exists():
return None
if path_obj.is_file():
# 做为文件流式下载
return FileResponse(path_obj)
else:
# 做为压缩包下载
shutil.make_archive(base_name=path_obj.stem, format="zip", root_dir=path_obj)
reponse = Response(content=path_obj.read_bytes(), media_type="application/zip")
# 删除压缩包
Path(f"{path_obj.stem}.zip").unlink()
return reponse
def move(self, fileitem: schemas.FileItem, target_dir: schemas.FileItem) -> bool:
"""
移动文件
"""
if not fileitem.path or not target_dir.path:
return False
path_obj = Path(fileitem.path)
target_obj = Path(target_dir.path)
if not path_obj.exists() or not target_obj.exists():
return False
path_obj.rename(target_obj / path_obj.name)
return True
def upload(self, fileitem: schemas.FileItem, path: Path) -> Optional[schemas.FileItem]:
"""
上传文件
"""
if not fileitem.path:
return None
path_obj = Path(fileitem.path)
if not path_obj.exists():
return None
shutil.copy(path, path_obj / path.name)
return schemas.FileItem(
type="file",
path=str(path_obj / path.name).replace("\\", "/"),
name=path.name,
basename=path.stem,
extension=path.suffix[1:],
size=path.stat().st_size,
modify_time=path.stat().st_mtime,
)

View File

@@ -0,0 +1,294 @@
import base64
from pathlib import Path
from typing import Optional, Tuple, List
import oss2
import py115
from py115 import Cloud
from py115.types import LoginTarget, QrcodeSession, QrcodeStatus, Credential, DownloadTicket
from app import schemas
from app.db.systemconfig_oper import SystemConfigOper
from app.log import logger
from app.modules.filetransfer.storage import StorageBase
from app.schemas.types import SystemConfigKey
from app.utils.singleton import Singleton
class U115Pan(StorageBase, metaclass=Singleton):
"""
115相关操作
"""
cloud: Optional[Cloud] = None
_session: QrcodeSession = None
def __init__(self):
self.systemconfig = SystemConfigOper()
def __init_cloud(self) -> bool:
"""
初始化Cloud
"""
credential = self.__credential
if not credential:
logger.warn("115未登录请先登录")
return False
try:
if not self.cloud:
self.cloud = py115.connect(credential)
except Exception as err:
logger.error(f"115连接失败请重新扫码登录{str(err)}")
self.__clear_credential()
return False
return True
@property
def __credential(self) -> Optional[Credential]:
"""
获取已保存的115认证参数
"""
cookie_dict = self.systemconfig.get(SystemConfigKey.User115Params)
if not cookie_dict:
return None
return Credential.from_dict(cookie_dict)
def __save_credentail(self, credential: Credential):
"""
设置115认证参数
"""
self.systemconfig.set(SystemConfigKey.User115Params, credential.to_dict())
def __clear_credential(self):
"""
清除115认证参数
"""
self.systemconfig.delete(SystemConfigKey.User115Params)
def generate_qrcode(self) -> Optional[str]:
"""
生成二维码
"""
try:
self.cloud = py115.connect()
self._session = self.cloud.qrcode_login(LoginTarget.Web)
image_bin = self._session.image_data
if not image_bin:
logger.warn("115生成二维码失败未获取到二维码数据")
return None
# 转换为base64图片格式
image_base64 = base64.b64encode(image_bin).decode()
return f"data:image/png;base64,{image_base64}"
except Exception as e:
logger.warn(f"115生成二维码失败{str(e)}")
return None
def check_login(self) -> Optional[Tuple[dict, str]]:
"""
二维码登录确认
"""
if not self._session:
return {}, "请先生成二维码!"
try:
if not self.cloud:
return {}, "请先生成二维码!"
status = self.cloud.qrcode_poll(self._session)
if status == QrcodeStatus.Done:
# 确认完成,保存认证信息
self.__save_credentail(self.cloud.export_credentail())
result = {
"status": 1,
"tip": "登录成功!"
}
elif status == QrcodeStatus.Waiting:
result = {
"status": 0,
"tip": "请使用微信或115客户端扫码"
}
elif status == QrcodeStatus.Expired:
result = {
"status": -1,
"tip": "二维码已过期,请重新刷新!"
}
self.cloud = None
elif status == QrcodeStatus.Failed:
result = {
"status": -2,
"tip": "登录失败,请重试!"
}
self.cloud = None
else:
result = {
"status": -3,
"tip": "未知错误,请重试!"
}
self.cloud = None
return result, ""
except Exception as e:
return {}, f"115登录确认失败{str(e)}"
def storage(self) -> Optional[Tuple[int, int]]:
"""
获取存储空间
"""
if not self.__init_cloud():
return None
try:
return self.cloud.storage().space()
except Exception as e:
logger.error(f"获取115存储空间失败{str(e)}")
return None
def check(self) -> bool:
"""
检查存储是否可用
"""
pass
def list(self, fileitem: schemas.FileItem) -> Optional[List[schemas.FileItem]]:
"""
浏览文件
"""
if not self.__init_cloud():
return None
try:
items = self.cloud.storage().list(dir_id=fileitem.parent_fileid)
return [schemas.FileItem(
fileid=item.file_id,
parent_fileid=item.parent_id,
type="dir" if item.is_dir else "file",
path=f"{fileitem.path}{item.name}" + ("/" if item.is_dir else ""),
name=item.name,
size=item.size,
extension=Path(item.name).suffix[1:],
modify_time=item.modified_time.timestamp() if item.modified_time else 0,
pickcode=item.pickcode
) for item in items]
except Exception as e:
logger.error(f"浏览115文件失败{str(e)}")
return None
def create_folder(self, fileitem: schemas.FileItem, name: str) -> Optional[schemas.FileItem]:
"""
创建目录
"""
if not self.__init_cloud():
return None
try:
result = self.cloud.storage().make_dir(fileitem.parent_fileid, name)
return schemas.FileItem(
fileid=result.file_id,
parent_fileid=result.parent_id,
type="dir",
path=f"{fileitem.path}{name}/",
name=name,
modify_time=result.modified_time.timestamp() if result.modified_time else 0,
pickcode=result.pickcode
)
except Exception as e:
logger.error(f"创建115目录失败{str(e)}")
return None
def detail(self, fileitm: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
获取文件详情
"""
pass
def delete(self, fileitem: schemas.FileItem) -> bool:
"""
删除文件
"""
if not self.__init_cloud():
return False
try:
self.cloud.storage().delete(fileitem.fileid)
return True
except Exception as e:
logger.error(f"删除115文件失败{str(e)}")
return False
def rename(self, fileitem: schemas.FileItem, name: str) -> bool:
"""
重命名文件
"""
if not self.__init_cloud():
return False
try:
self.cloud.storage().rename(fileitem.fileid, name)
return True
except Exception as e:
logger.error(f"重命名115文件失败{str(e)}")
return False
def download(self, fileitem: schemas.FileItem) -> Optional[DownloadTicket]:
"""
获取下载链接
"""
if not self.__init_cloud():
return None
try:
return self.cloud.storage().request_download(fileitem.pickcode)
except Exception as e:
logger.error(f"115下载失败{str(e)}")
return None
def move(self, fileitem: schemas.FileItem, target_dir: schemas.FileItem) -> bool:
"""
移动文件
"""
if not self.__init_cloud():
return False
try:
self.cloud.storage().move(fileitem.fileid, target_dir.fileid)
return True
except Exception as e:
logger.error(f"移动115文件失败{str(e)}")
return False
def upload(self, fileitem: schemas.FileItem, path: Path) -> Optional[schemas.FileItem]:
"""
上传文件
"""
if not self.__init_cloud():
return None
try:
ticket = self.cloud.storage().request_upload(dir_id=fileitem.fileid, file_path=str(path))
if ticket is None:
logger.warn(f"115请求上传出错")
return None
elif ticket.is_done:
logger.warn(f"115请求上传失败文件已存在")
return None
else:
auth = oss2.Auth(**ticket.oss_token)
bucket = oss2.Bucket(
auth=auth,
endpoint=ticket.oss_endpoint,
bucket_name=ticket.bucket_name,
)
por = bucket.put_object_from_file(
key=ticket.object_key,
filename=str(path),
headers=ticket.headers,
)
result = por.resp.response.json()
if result:
fileitem = result.get('data')
logger.info(f"115上传文件成功{fileitem}")
return schemas.FileItem(
fileid=fileitem.get('file_id'),
parent_fileid=fileitem.fileid,
type="file",
name=fileitem.get('file_name'),
path=f"{fileitem.path}{fileitem.get('file_name')}",
size=fileitem.get('file_size'),
extension=Path(fileitem.get('file_name')).suffix[1:],
pickcode=fileitem.get('pickcode')
)
else:
logger.warn(f"115上传文件失败{por.resp.response.text}")
return None
except Exception as e:
logger.error(f"上传115文件失败{str(e)}")
return None