use aligo

This commit is contained in:
jxxghp
2024-09-25 18:44:18 +08:00
parent 41d41685fe
commit 2da95fa4e6
8 changed files with 203 additions and 634 deletions

View File

@@ -118,7 +118,7 @@ class Rclone(StorageBase):
logger.error(f"rclone浏览文件失败{err}")
return []
def create_folder(self, fileitm: schemas.FileItem, name: str) -> Optional[schemas.FileItem]:
def create_folder(self, fileitem: schemas.FileItem, name: str) -> Optional[schemas.FileItem]:
"""
创建目录
"""
@@ -126,13 +126,13 @@ class Rclone(StorageBase):
retcode = subprocess.run(
[
'rclone', 'mkdir',
f'MP:{fileitm.path}/{name}'
f'MP:{fileitem.path}/{name}'
],
startupinfo=self.__get_hidden_shell()
).returncode
if retcode == 0:
ret_fileitem = copy.deepcopy(fileitm)
ret_fileitem.path = f"{fileitm.path}/{name}/"
ret_fileitem = copy.deepcopy(fileitem)
ret_fileitem.path = f"{fileitem.path}/{name}/"
ret_fileitem.name = name
return ret_fileitem
except Exception as err:
@@ -191,7 +191,7 @@ class Rclone(StorageBase):
logger.error(f"rclone获取文件失败{err}")
return None
def delete(self, fileitm: schemas.FileItem) -> bool:
def delete(self, fileitem: schemas.FileItem) -> bool:
"""
删除文件
"""
@@ -199,7 +199,7 @@ class Rclone(StorageBase):
retcode = subprocess.run(
[
'rclone', 'deletefile',
f'MP:{fileitm.path}'
f'MP:{fileitem.path}'
],
startupinfo=self.__get_hidden_shell()
).returncode
@@ -209,7 +209,7 @@ class Rclone(StorageBase):
logger.error(f"rclone删除文件失败{err}")
return False
def rename(self, fileitm: schemas.FileItem, name: str) -> bool:
def rename(self, fileitem: schemas.FileItem, name: str) -> bool:
"""
重命名文件
"""
@@ -217,8 +217,8 @@ class Rclone(StorageBase):
retcode = subprocess.run(
[
'rclone', 'moveto',
f'MP:{fileitm.path}',
f'MP:{Path(fileitm.path).parent}/{name}'
f'MP:{fileitem.path}',
f'MP:{Path(fileitem.path).parent}/{name}'
],
startupinfo=self.__get_hidden_shell()
).returncode
@@ -228,11 +228,11 @@ class Rclone(StorageBase):
logger.error(f"rclone重命名文件失败{err}")
return False
def download(self, fileitem: schemas.FileItem) -> Optional[Path]:
def download(self, fileitem: schemas.FileItem, path: Path = None) -> Optional[Path]:
"""
下载文件
"""
path = settings.TEMP_PATH / fileitem.name
path = (path or settings.TEMP_PATH) / fileitem.name
try:
retcode = subprocess.run(
[
@@ -248,7 +248,7 @@ class Rclone(StorageBase):
logger.error(f"rclone复制文件失败{err}")
return None
def upload(self, fileitm: schemas.FileItem, path: Path) -> Optional[schemas.FileItem]:
def upload(self, fileitem: schemas.FileItem, path: Path) -> Optional[schemas.FileItem]:
"""
上传文件
"""
@@ -256,7 +256,7 @@ class Rclone(StorageBase):
retcode = subprocess.run(
[
'rclone', 'copyto',
fileitm.path,
fileitem.path,
f'MP:{path}'
],
startupinfo=self.__get_hidden_shell()
@@ -267,7 +267,7 @@ class Rclone(StorageBase):
logger.error(f"rclone上传文件失败{err}")
return None
def detail(self, fileitm: schemas.FileItem) -> Optional[schemas.FileItem]:
def detail(self, fileitem: schemas.FileItem) -> Optional[schemas.FileItem]:
"""
获取文件详情
"""
@@ -275,7 +275,7 @@ class Rclone(StorageBase):
ret = subprocess.run(
[
'rclone', 'lsjson',
f'MP:{fileitm.path}'
f'MP:{fileitem.path}'
],
capture_output=True,
startupinfo=self.__get_hidden_shell()
@@ -287,7 +287,7 @@ class Rclone(StorageBase):
logger.error(f"rclone获取文件详情失败{err}")
return None
def move(self, fileitm: schemas.FileItem, target: Path) -> bool:
def move(self, fileitem: schemas.FileItem, target: Path) -> bool:
"""
移动文件target_file格式rclone:path
"""
@@ -295,7 +295,7 @@ class Rclone(StorageBase):
retcode = subprocess.run(
[
'rclone', 'moveto',
f'MP:{fileitm.path}',
f'MP:{fileitem.path}',
f'MP:{target}'
],
startupinfo=self.__get_hidden_shell()
@@ -306,13 +306,13 @@ class Rclone(StorageBase):
logger.error(f"rclone移动文件失败{err}")
return False
def copy(self, fileitm: schemas.FileItem, target_file: Path) -> bool:
def copy(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
pass
def link(self, fileitm: schemas.FileItem, target_file: Path) -> bool:
def link(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
pass
def softlink(self, fileitm: schemas.FileItem, target_file: Path) -> bool:
def softlink(self, fileitem: schemas.FileItem, target_file: Path) -> bool:
pass
def usage(self) -> Optional[schemas.StorageUsage]: