mirror of
https://github.com/DrizzleTime/Foxel.git
synced 2026-05-07 05:42:56 +08:00
29 lines
1.4 KiB
Python
29 lines
1.4 KiB
Python
from typing import List, Dict, Protocol, runtime_checkable, Tuple, AsyncIterator
|
|
from models import StorageAdapter
|
|
|
|
# 约定:任意新适配器模块需定义:
|
|
# ADAPTER_TYPE: str
|
|
# CONFIG_SCHEMA: List[Dict]
|
|
# ADAPTER_FACTORY: Callable[[StorageAdapter], BaseAdapter] (可省略, 会自动寻找 *Adapter 类)
|
|
|
|
@runtime_checkable
|
|
class BaseAdapter(Protocol):
|
|
record: StorageAdapter
|
|
async def list_dir(self, root: str, rel: str, page_num: int = 1, page_size: int = 50, sort_by: str = "name", sort_order: str = "asc") -> Tuple[List[Dict], int]: ...
|
|
async def read_file(self, root: str, rel: str) -> bytes: ...
|
|
async def write_file(self, root: str, rel: str, data: bytes): ...
|
|
async def write_file_stream(self, root: str, rel: str, data_iter: AsyncIterator[bytes]): ...
|
|
async def mkdir(self, root: str, rel: str): ...
|
|
async def delete(self, root: str, rel: str): ...
|
|
async def move(self, root: str, src_rel: str, dst_rel: str): ...
|
|
async def rename(self, root: str, src_rel: str, dst_rel: str): ...
|
|
async def copy(self, root: str, src_rel: str, dst_rel: str, overwrite: bool = False): ...
|
|
async def stream_file(self, root: str, rel: str, range_header: str | None): ...
|
|
async def stat_file(self, root: str, rel: str): ...
|
|
def get_effective_root(self, sub_path: str | None) -> str: ...
|
|
|
|
|
|
@runtime_checkable
|
|
class UsageCapableAdapter(Protocol):
|
|
async def get_usage(self, root: str) -> Dict: ...
|