mirror of
https://github.com/DrizzleTime/Foxel.git
synced 2026-05-07 05:02:42 +08:00
646 lines
22 KiB
Python
646 lines
22 KiB
Python
import asyncio
|
|
from dataclasses import dataclass
|
|
from typing import List, Dict, Tuple, AsyncIterator, Optional
|
|
|
|
from fastapi import HTTPException
|
|
from fastapi.responses import StreamingResponse
|
|
from ftplib import FTP, error_perm
|
|
import mimetypes
|
|
|
|
from models import StorageAdapter
|
|
|
|
|
|
def _join_remote(root: str, rel: str) -> str:
|
|
root = (root or "/").rstrip("/") or "/"
|
|
rel = (rel or "").lstrip("/")
|
|
if not rel:
|
|
return root
|
|
return f"{root}/{rel}"
|
|
|
|
|
|
def _parse_mlst_line(line: str) -> Dict[str, str]:
|
|
out: Dict[str, str] = {}
|
|
try:
|
|
facts, _, name = line.partition(" ")
|
|
for part in facts.split(";"):
|
|
if not part or "=" not in part:
|
|
continue
|
|
k, v = part.split("=", 1)
|
|
out[k.strip().lower()] = v.strip()
|
|
if name:
|
|
out["name"] = name.strip()
|
|
except Exception:
|
|
pass
|
|
return out
|
|
|
|
|
|
def _parse_modify_to_epoch(mod: str) -> int:
|
|
# Formats we may see: YYYYMMDDHHMMSS or YYYYMMDDHHMMSS(.sss)
|
|
try:
|
|
mod = mod.strip()
|
|
mod = mod.split(".")[0]
|
|
if len(mod) >= 14:
|
|
y = int(mod[0:4])
|
|
m = int(mod[4:6])
|
|
d = int(mod[6:8])
|
|
hh = int(mod[8:10])
|
|
mm = int(mod[10:12])
|
|
ss = int(mod[12:14])
|
|
import datetime as _dt
|
|
return int(_dt.datetime(y, m, d, hh, mm, ss, tzinfo=_dt.timezone.utc).timestamp())
|
|
except Exception:
|
|
return 0
|
|
return 0
|
|
|
|
|
|
@dataclass
|
|
class _Range:
|
|
start: int
|
|
end: Optional[int] # inclusive
|
|
|
|
|
|
class FTPAdapter:
|
|
def __init__(self, record: StorageAdapter):
|
|
self.record = record
|
|
cfg = record.config
|
|
self.host: str = cfg.get("host")
|
|
self.port: int = int(cfg.get("port", 21))
|
|
self.username: Optional[str] = cfg.get("username")
|
|
self.password: Optional[str] = cfg.get("password")
|
|
self.passive: bool = bool(cfg.get("passive", True))
|
|
self.timeout: int = int(cfg.get("timeout", 15))
|
|
self.root_path: str = cfg.get("root", "/") or "/"
|
|
|
|
if not self.host:
|
|
raise ValueError("FTP adapter requires 'host'")
|
|
|
|
def get_effective_root(self, sub_path: str | None) -> str:
|
|
base = self.root_path.rstrip("/") or "/"
|
|
if sub_path:
|
|
return _join_remote(base, sub_path)
|
|
return base
|
|
|
|
def _connect(self) -> FTP:
|
|
ftp = FTP()
|
|
ftp.connect(self.host, self.port, timeout=self.timeout)
|
|
if self.username:
|
|
ftp.login(self.username, self.password or "")
|
|
else:
|
|
ftp.login()
|
|
ftp.set_pasv(self.passive)
|
|
return ftp
|
|
|
|
async def list_dir(self, root: str, rel: str, page_num: int = 1, page_size: int = 50, sort_by: str = "name", sort_order: str = "asc") -> Tuple[List[Dict], int]:
|
|
path = _join_remote(root, rel.strip('/'))
|
|
|
|
def _do_list() -> List[Dict]:
|
|
ftp = self._connect()
|
|
try:
|
|
ftp.cwd(path)
|
|
except error_perm as e:
|
|
# path may be file
|
|
ftp.quit()
|
|
raise NotADirectoryError(rel) from e
|
|
|
|
entries: List[Dict] = []
|
|
# Try MLSD first
|
|
try:
|
|
for name, facts in ftp.mlsd():
|
|
if name in (".", ".."):
|
|
continue
|
|
is_dir = (facts.get("type") == "dir")
|
|
size = int(facts.get("size") or 0)
|
|
mtime = _parse_modify_to_epoch(facts.get("modify") or "")
|
|
entries.append({
|
|
"name": name,
|
|
"is_dir": is_dir,
|
|
"size": 0 if is_dir else size,
|
|
"mtime": mtime,
|
|
"type": "dir" if is_dir else "file",
|
|
})
|
|
ftp.quit()
|
|
return entries
|
|
except Exception:
|
|
# Fallback to NLST + probing
|
|
pass
|
|
|
|
names = []
|
|
try:
|
|
names = ftp.nlst()
|
|
except Exception:
|
|
ftp.quit()
|
|
return []
|
|
|
|
for name in names:
|
|
if name in (".", ".."):
|
|
continue
|
|
is_dir = False
|
|
size = 0
|
|
mtime = 0
|
|
try:
|
|
# If we can CWD, it's a directory
|
|
ftp.cwd(_join_remote(path, name))
|
|
ftp.cwd(path)
|
|
is_dir = True
|
|
except Exception:
|
|
is_dir = False
|
|
try:
|
|
size = ftp.size(_join_remote(path, name)) or 0
|
|
except Exception:
|
|
size = 0
|
|
try:
|
|
mdtm = ftp.sendcmd("MDTM " + _join_remote(path, name))
|
|
# Example: '213 20241012XXXXXX'
|
|
if mdtm.startswith("213 "):
|
|
mtime = _parse_modify_to_epoch(mdtm.split(" ", 1)[1])
|
|
except Exception:
|
|
pass
|
|
entries.append({
|
|
"name": name,
|
|
"is_dir": is_dir,
|
|
"size": 0 if is_dir else int(size or 0),
|
|
"mtime": int(mtime or 0),
|
|
"type": "dir" if is_dir else "file",
|
|
})
|
|
ftp.quit()
|
|
return entries
|
|
|
|
entries = await asyncio.to_thread(_do_list)
|
|
|
|
reverse = sort_order.lower() == "desc"
|
|
|
|
def get_sort_key(item):
|
|
key = (not item["is_dir"],)
|
|
f = sort_by.lower()
|
|
if f == "name":
|
|
key += (item["name"].lower(),)
|
|
elif f == "size":
|
|
key += (item.get("size", 0),)
|
|
elif f == "mtime":
|
|
key += (item.get("mtime", 0),)
|
|
else:
|
|
key += (item["name"].lower(),)
|
|
return key
|
|
|
|
entries.sort(key=get_sort_key, reverse=reverse)
|
|
total = len(entries)
|
|
start = (page_num - 1) * page_size
|
|
end = start + page_size
|
|
return entries[start:end], total
|
|
|
|
async def read_file(self, root: str, rel: str) -> bytes:
|
|
path = _join_remote(root, rel)
|
|
|
|
def _do_read() -> bytes:
|
|
ftp = self._connect()
|
|
try:
|
|
chunks: List[bytes] = []
|
|
ftp.retrbinary("RETR " + path, lambda b: chunks.append(b))
|
|
return b"".join(chunks)
|
|
except error_perm as e:
|
|
if str(e).startswith("550"):
|
|
raise FileNotFoundError(rel)
|
|
raise
|
|
finally:
|
|
try:
|
|
ftp.quit()
|
|
except Exception:
|
|
pass
|
|
|
|
return await asyncio.to_thread(_do_read)
|
|
|
|
async def write_file(self, root: str, rel: str, data: bytes):
|
|
path = _join_remote(root, rel)
|
|
|
|
def _ensure_dirs(ftp: FTP, dir_path: str):
|
|
parts = [p for p in dir_path.strip("/").split("/") if p]
|
|
cur = "/"
|
|
for p in parts:
|
|
cur = _join_remote(cur, p)
|
|
try:
|
|
ftp.mkd(cur)
|
|
except Exception:
|
|
pass
|
|
|
|
def _do_write():
|
|
ftp = self._connect()
|
|
try:
|
|
parent = "/" if "/" not in path.strip("/") else path.rsplit("/", 1)[0]
|
|
_ensure_dirs(ftp, parent)
|
|
from io import BytesIO
|
|
bio = BytesIO(data)
|
|
ftp.storbinary("STOR " + path, bio)
|
|
finally:
|
|
try:
|
|
ftp.quit()
|
|
except Exception:
|
|
pass
|
|
|
|
await asyncio.to_thread(_do_write)
|
|
|
|
async def write_upload_file(self, root: str, rel: str, file_obj, filename: str | None, file_size: int | None = None, content_type: str | None = None):
|
|
path = _join_remote(root, rel)
|
|
|
|
def _ensure_dirs(ftp: FTP, dir_path: str):
|
|
parts = [p for p in dir_path.strip("/").split("/") if p]
|
|
cur = "/"
|
|
for p in parts:
|
|
cur = _join_remote(cur, p)
|
|
try:
|
|
ftp.mkd(cur)
|
|
except Exception:
|
|
pass
|
|
|
|
def _do_upload():
|
|
ftp = self._connect()
|
|
try:
|
|
parent = "/" if "/" not in path.strip("/") else path.rsplit("/", 1)[0]
|
|
_ensure_dirs(ftp, parent)
|
|
try:
|
|
if callable(getattr(file_obj, "seek", None)):
|
|
file_obj.seek(0)
|
|
except Exception:
|
|
pass
|
|
ftp.storbinary("STOR " + path, file_obj)
|
|
finally:
|
|
try:
|
|
ftp.quit()
|
|
except Exception:
|
|
pass
|
|
|
|
await asyncio.to_thread(_do_upload)
|
|
return {"size": file_size or 0}
|
|
|
|
async def write_file_stream(self, root: str, rel: str, data_iter: AsyncIterator[bytes]):
|
|
# KISS: 聚合后一次性写入
|
|
buf = bytearray()
|
|
async for chunk in data_iter:
|
|
if chunk:
|
|
buf.extend(chunk)
|
|
await self.write_file(root, rel, bytes(buf))
|
|
return len(buf)
|
|
|
|
async def mkdir(self, root: str, rel: str):
|
|
path = _join_remote(root, rel)
|
|
|
|
def _do_mkdir():
|
|
ftp = self._connect()
|
|
try:
|
|
parts = [p for p in path.strip("/").split("/") if p]
|
|
cur = "/"
|
|
for p in parts:
|
|
cur = _join_remote(cur, p)
|
|
try:
|
|
ftp.mkd(cur)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
try:
|
|
ftp.quit()
|
|
except Exception:
|
|
pass
|
|
|
|
await asyncio.to_thread(_do_mkdir)
|
|
|
|
async def delete(self, root: str, rel: str):
|
|
path = _join_remote(root, rel)
|
|
|
|
def _do_delete():
|
|
ftp = self._connect()
|
|
try:
|
|
# Try file delete
|
|
try:
|
|
ftp.delete(path)
|
|
return
|
|
except Exception:
|
|
pass
|
|
|
|
# Recursively delete dir
|
|
def _rm_tree(dir_path: str):
|
|
try:
|
|
ftp.cwd(dir_path)
|
|
except Exception:
|
|
return
|
|
items = []
|
|
try:
|
|
for name, facts in ftp.mlsd():
|
|
if name in (".", ".."):
|
|
continue
|
|
items.append((name, facts.get("type") == "dir"))
|
|
except Exception:
|
|
try:
|
|
names = ftp.nlst()
|
|
except Exception:
|
|
names = []
|
|
for n in names:
|
|
if n in (".", ".."):
|
|
continue
|
|
# Best-effort dir check
|
|
try:
|
|
ftp.cwd(_join_remote(dir_path, n))
|
|
ftp.cwd(dir_path)
|
|
items.append((n, True))
|
|
except Exception:
|
|
items.append((n, False))
|
|
for n, is_dir in items:
|
|
child = _join_remote(dir_path, n)
|
|
if is_dir:
|
|
_rm_tree(child)
|
|
else:
|
|
try:
|
|
ftp.delete(child)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
ftp.rmd(dir_path)
|
|
except Exception:
|
|
pass
|
|
|
|
_rm_tree(path)
|
|
finally:
|
|
try:
|
|
ftp.quit()
|
|
except Exception:
|
|
pass
|
|
|
|
await asyncio.to_thread(_do_delete)
|
|
|
|
async def move(self, root: str, src_rel: str, dst_rel: str):
|
|
src = _join_remote(root, src_rel)
|
|
dst = _join_remote(root, dst_rel)
|
|
|
|
def _do_move():
|
|
ftp = self._connect()
|
|
try:
|
|
# Ensure dst parent exists
|
|
parent = "/" if "/" not in dst.strip("/") else dst.rsplit("/", 1)[0]
|
|
parts = [p for p in parent.strip("/").split("/") if p]
|
|
cur = "/"
|
|
for p in parts:
|
|
cur = _join_remote(cur, p)
|
|
try:
|
|
ftp.mkd(cur)
|
|
except Exception:
|
|
pass
|
|
ftp.rename(src, dst)
|
|
finally:
|
|
try:
|
|
ftp.quit()
|
|
except Exception:
|
|
pass
|
|
|
|
await asyncio.to_thread(_do_move)
|
|
|
|
async def rename(self, root: str, src_rel: str, dst_rel: str):
|
|
await self.move(root, src_rel, dst_rel)
|
|
|
|
async def copy(self, root: str, src_rel: str, dst_rel: str, overwrite: bool = False):
|
|
src = _join_remote(root, src_rel)
|
|
dst = _join_remote(root, dst_rel)
|
|
|
|
# naive implementation: download then upload; recursively for dirs
|
|
async def _is_dir(path: str) -> bool:
|
|
def _probe() -> bool:
|
|
ftp = self._connect()
|
|
try:
|
|
try:
|
|
ftp.cwd(path)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
finally:
|
|
try:
|
|
ftp.quit()
|
|
except Exception:
|
|
pass
|
|
return await asyncio.to_thread(_probe)
|
|
|
|
if await _is_dir(src):
|
|
# list children, create dst dir, copy recursively
|
|
await self.mkdir(root, dst_rel)
|
|
|
|
children, _ = await self.list_dir(root, src_rel, page_num=1, page_size=10_000)
|
|
for ent in children:
|
|
child_src = f"{src_rel.rstrip('/')}/{ent['name']}"
|
|
child_dst = f"{dst_rel.rstrip('/')}/{ent['name']}"
|
|
await self.copy(root, child_src, child_dst, overwrite)
|
|
return
|
|
|
|
# file
|
|
data = await self.read_file(root, src_rel)
|
|
if not overwrite:
|
|
# best-effort existence check
|
|
try:
|
|
await self.stat_file(root, dst_rel)
|
|
raise FileExistsError(dst_rel)
|
|
except FileNotFoundError:
|
|
pass
|
|
await self.write_file(root, dst_rel, data)
|
|
|
|
async def stat_file(self, root: str, rel: str):
|
|
path = _join_remote(root, rel)
|
|
|
|
def _do_stat():
|
|
ftp = self._connect()
|
|
try:
|
|
# Try MLST
|
|
try:
|
|
resp: List[str] = []
|
|
ftp.retrlines("MLST " + path, resp.append)
|
|
# The last line usually contains facts
|
|
facts = {}
|
|
if resp:
|
|
facts = _parse_mlst_line(resp[-1])
|
|
name = rel.split("/")[-1]
|
|
t = facts.get("type") or "file"
|
|
is_dir = t == "dir"
|
|
size = int(facts.get("size") or 0)
|
|
mtime = _parse_modify_to_epoch(facts.get("modify") or "")
|
|
return {
|
|
"name": name,
|
|
"is_dir": is_dir,
|
|
"size": 0 if is_dir else size,
|
|
"mtime": mtime,
|
|
"type": "dir" if is_dir else "file",
|
|
"path": path,
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
# Probe directory
|
|
try:
|
|
ftp.cwd(path)
|
|
return {
|
|
"name": rel.split("/")[-1],
|
|
"is_dir": True,
|
|
"size": 0,
|
|
"mtime": 0,
|
|
"type": "dir",
|
|
"path": path,
|
|
}
|
|
except Exception:
|
|
pass
|
|
|
|
# Treat as file
|
|
try:
|
|
size = ftp.size(path) or 0
|
|
except Exception:
|
|
size = 0
|
|
try:
|
|
mdtm = ftp.sendcmd("MDTM " + path)
|
|
mtime = _parse_modify_to_epoch(mdtm.split(" ", 1)[1]) if mdtm.startswith("213 ") else 0
|
|
except Exception:
|
|
mtime = 0
|
|
return {
|
|
"name": rel.split("/")[-1],
|
|
"is_dir": False,
|
|
"size": int(size or 0),
|
|
"mtime": int(mtime or 0),
|
|
"type": "file",
|
|
"path": path,
|
|
}
|
|
except error_perm as e:
|
|
if str(e).startswith("550"):
|
|
raise FileNotFoundError(rel)
|
|
raise
|
|
finally:
|
|
try:
|
|
ftp.quit()
|
|
except Exception:
|
|
pass
|
|
|
|
return await asyncio.to_thread(_do_stat)
|
|
|
|
async def stream_file(self, root: str, rel: str, range_header: str | None):
|
|
path = _join_remote(root, rel)
|
|
# Get size (best-effort)
|
|
def _get_size() -> Optional[int]:
|
|
ftp = self._connect()
|
|
try:
|
|
try:
|
|
return int(ftp.size(path) or 0)
|
|
except Exception:
|
|
return None
|
|
finally:
|
|
try:
|
|
ftp.quit()
|
|
except Exception:
|
|
pass
|
|
|
|
total_size = await asyncio.to_thread(_get_size)
|
|
mime, _ = mimetypes.guess_type(rel)
|
|
content_type = mime or "application/octet-stream"
|
|
|
|
rng: Optional[_Range] = None
|
|
status = 200
|
|
headers = {"Accept-Ranges": "bytes", "Content-Type": content_type}
|
|
if range_header and range_header.startswith("bytes=") and total_size is not None:
|
|
try:
|
|
s, e = (range_header.removeprefix("bytes=").split("-", 1))
|
|
start = int(s) if s.strip() else 0
|
|
end = int(e) if e.strip() else (total_size - 1)
|
|
if start >= total_size:
|
|
raise HTTPException(416, detail="Requested Range Not Satisfiable")
|
|
if end >= total_size:
|
|
end = total_size - 1
|
|
rng = _Range(start, end)
|
|
status = 206
|
|
headers["Content-Range"] = f"bytes {start}-{end}/{total_size}"
|
|
headers["Content-Length"] = str(end - start + 1)
|
|
except ValueError:
|
|
raise HTTPException(400, detail="Invalid Range header")
|
|
elif total_size is not None:
|
|
headers["Content-Length"] = str(total_size)
|
|
|
|
queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue(maxsize=8)
|
|
|
|
class _Stop(Exception):
|
|
pass
|
|
|
|
def _worker():
|
|
ftp = self._connect()
|
|
remaining = None
|
|
if rng is not None:
|
|
remaining = (rng.end - rng.start + 1) if rng.end is not None else None
|
|
|
|
def _cb(chunk: bytes):
|
|
nonlocal remaining
|
|
if not chunk:
|
|
return
|
|
try:
|
|
if remaining is not None:
|
|
if len(chunk) > remaining:
|
|
part = chunk[:remaining]
|
|
queue.put_nowait(part)
|
|
remaining = 0
|
|
raise _Stop()
|
|
else:
|
|
queue.put_nowait(chunk)
|
|
remaining -= len(chunk)
|
|
if remaining <= 0:
|
|
raise _Stop()
|
|
else:
|
|
queue.put_nowait(chunk)
|
|
except _Stop:
|
|
raise
|
|
except Exception:
|
|
# queue full or event loop closed
|
|
raise _Stop()
|
|
|
|
try:
|
|
if rng is not None:
|
|
ftp.retrbinary("RETR " + path, _cb, rest=rng.start)
|
|
else:
|
|
ftp.retrbinary("RETR " + path, _cb)
|
|
queue.put_nowait(None)
|
|
except _Stop:
|
|
try:
|
|
queue.put_nowait(None)
|
|
except Exception:
|
|
pass
|
|
except error_perm as e:
|
|
try:
|
|
queue.put_nowait(None)
|
|
except Exception:
|
|
pass
|
|
if str(e).startswith("550"):
|
|
pass
|
|
finally:
|
|
try:
|
|
ftp.quit()
|
|
except Exception:
|
|
pass
|
|
|
|
async def agen():
|
|
worker_fut = asyncio.to_thread(_worker)
|
|
try:
|
|
while True:
|
|
chunk = await queue.get()
|
|
if chunk is None:
|
|
break
|
|
yield chunk
|
|
finally:
|
|
try:
|
|
await worker_fut
|
|
except Exception:
|
|
pass
|
|
|
|
return StreamingResponse(agen(), status_code=status, headers=headers, media_type=content_type)
|
|
|
|
|
|
ADAPTER_TYPE = "ftp"
|
|
|
|
CONFIG_SCHEMA = [
|
|
{"key": "host", "label": "主机", "type": "string", "required": True, "placeholder": "ftp.example.com"},
|
|
{"key": "port", "label": "端口", "type": "number", "required": False, "default": 21},
|
|
{"key": "username", "label": "用户名", "type": "string", "required": False},
|
|
{"key": "password", "label": "密码", "type": "password", "required": False},
|
|
{"key": "passive", "label": "被动模式", "type": "boolean", "required": False, "default": True},
|
|
{"key": "timeout", "label": "超时(秒)", "type": "number", "required": False, "default": 15},
|
|
{"key": "root", "label": "根路径", "type": "string", "required": False, "default": "/"},
|
|
]
|
|
|
|
|
|
def ADAPTER_FACTORY(rec: StorageAdapter):
|
|
return FTPAdapter(rec)
|