from __future__ import annotations import asyncio from dataclasses import dataclass from typing import List, Dict, Tuple, AsyncIterator, Optional from fastapi import HTTPException from fastapi.responses import StreamingResponse from ftplib import FTP, error_perm import mimetypes from models import StorageAdapter from services.logging import LogService def _join_remote(root: str, rel: str) -> str: root = (root or "/").rstrip("/") or "/" rel = (rel or "").lstrip("/") if not rel: return root return f"{root}/{rel}" def _parse_mlst_line(line: str) -> Dict[str, str]: out: Dict[str, str] = {} try: facts, _, name = line.partition(" ") for part in facts.split(";"): if not part or "=" not in part: continue k, v = part.split("=", 1) out[k.strip().lower()] = v.strip() if name: out["name"] = name.strip() except Exception: pass return out def _parse_modify_to_epoch(mod: str) -> int: # Formats we may see: YYYYMMDDHHMMSS or YYYYMMDDHHMMSS(.sss) try: mod = mod.strip() mod = mod.split(".")[0] if len(mod) >= 14: y = int(mod[0:4]) m = int(mod[4:6]) d = int(mod[6:8]) hh = int(mod[8:10]) mm = int(mod[10:12]) ss = int(mod[12:14]) import datetime as _dt return int(_dt.datetime(y, m, d, hh, mm, ss, tzinfo=_dt.timezone.utc).timestamp()) except Exception: return 0 return 0 @dataclass class _Range: start: int end: Optional[int] # inclusive class FTPAdapter: def __init__(self, record: StorageAdapter): self.record = record cfg = record.config self.host: str = cfg.get("host") self.port: int = int(cfg.get("port", 21)) self.username: Optional[str] = cfg.get("username") self.password: Optional[str] = cfg.get("password") self.passive: bool = bool(cfg.get("passive", True)) self.timeout: int = int(cfg.get("timeout", 15)) self.root_path: str = cfg.get("root", "/") or "/" if not self.host: raise ValueError("FTP adapter requires 'host'") def get_effective_root(self, sub_path: str | None) -> str: base = self.root_path.rstrip("/") or "/" if sub_path: return _join_remote(base, sub_path) return base def _connect(self) -> FTP: ftp = FTP() ftp.connect(self.host, self.port, timeout=self.timeout) if self.username: ftp.login(self.username, self.password or "") else: ftp.login() ftp.set_pasv(self.passive) return ftp async def list_dir(self, root: str, rel: str, page_num: int = 1, page_size: int = 50, sort_by: str = "name", sort_order: str = "asc") -> Tuple[List[Dict], int]: path = _join_remote(root, rel.strip('/')) def _do_list() -> List[Dict]: ftp = self._connect() try: ftp.cwd(path) except error_perm as e: # path may be file ftp.quit() raise NotADirectoryError(rel) from e entries: List[Dict] = [] # Try MLSD first try: for name, facts in ftp.mlsd(): if name in (".", ".."): continue is_dir = (facts.get("type") == "dir") size = int(facts.get("size") or 0) mtime = _parse_modify_to_epoch(facts.get("modify") or "") entries.append({ "name": name, "is_dir": is_dir, "size": 0 if is_dir else size, "mtime": mtime, "type": "dir" if is_dir else "file", }) ftp.quit() return entries except Exception: # Fallback to NLST + probing pass names = [] try: names = ftp.nlst() except Exception: ftp.quit() return [] for name in names: if name in (".", ".."): continue is_dir = False size = 0 mtime = 0 try: # If we can CWD, it's a directory ftp.cwd(_join_remote(path, name)) ftp.cwd(path) is_dir = True except Exception: is_dir = False try: size = ftp.size(_join_remote(path, name)) or 0 except Exception: size = 0 try: mdtm = ftp.sendcmd("MDTM " + _join_remote(path, name)) # Example: '213 20241012XXXXXX' if mdtm.startswith("213 "): mtime = _parse_modify_to_epoch(mdtm.split(" ", 1)[1]) except Exception: pass entries.append({ "name": name, "is_dir": is_dir, "size": 0 if is_dir else int(size or 0), "mtime": int(mtime or 0), "type": "dir" if is_dir else "file", }) ftp.quit() return entries entries = await asyncio.to_thread(_do_list) reverse = sort_order.lower() == "desc" def get_sort_key(item): key = (not item["is_dir"],) f = sort_by.lower() if f == "name": key += (item["name"].lower(),) elif f == "size": key += (item.get("size", 0),) elif f == "mtime": key += (item.get("mtime", 0),) else: key += (item["name"].lower(),) return key entries.sort(key=get_sort_key, reverse=reverse) total = len(entries) start = (page_num - 1) * page_size end = start + page_size return entries[start:end], total async def read_file(self, root: str, rel: str) -> bytes: path = _join_remote(root, rel) def _do_read() -> bytes: ftp = self._connect() try: chunks: List[bytes] = [] ftp.retrbinary("RETR " + path, lambda b: chunks.append(b)) return b"".join(chunks) except error_perm as e: if str(e).startswith("550"): raise FileNotFoundError(rel) raise finally: try: ftp.quit() except Exception: pass return await asyncio.to_thread(_do_read) async def write_file(self, root: str, rel: str, data: bytes): path = _join_remote(root, rel) def _ensure_dirs(ftp: FTP, dir_path: str): parts = [p for p in dir_path.strip("/").split("/") if p] cur = "/" for p in parts: cur = _join_remote(cur, p) try: ftp.mkd(cur) except Exception: pass def _do_write(): ftp = self._connect() try: parent = "/" if "/" not in path.strip("/") else path.rsplit("/", 1)[0] _ensure_dirs(ftp, parent) from io import BytesIO bio = BytesIO(data) ftp.storbinary("STOR " + path, bio) finally: try: ftp.quit() except Exception: pass await asyncio.to_thread(_do_write) await LogService.info( "adapter:ftp", f"Wrote file to {rel}", details={"adapter_id": self.record.id, "path": path, "size": len(data)}, ) async def write_file_stream(self, root: str, rel: str, data_iter: AsyncIterator[bytes]): # KISS: 聚合后一次性写入 buf = bytearray() async for chunk in data_iter: if chunk: buf.extend(chunk) await self.write_file(root, rel, bytes(buf)) return len(buf) async def mkdir(self, root: str, rel: str): path = _join_remote(root, rel) def _do_mkdir(): ftp = self._connect() try: parts = [p for p in path.strip("/").split("/") if p] cur = "/" for p in parts: cur = _join_remote(cur, p) try: ftp.mkd(cur) except Exception: pass finally: try: ftp.quit() except Exception: pass await asyncio.to_thread(_do_mkdir) await LogService.info("adapter:ftp", f"Created directory {rel}", details={"adapter_id": self.record.id, "path": path}) async def delete(self, root: str, rel: str): path = _join_remote(root, rel) def _do_delete(): ftp = self._connect() try: # Try file delete try: ftp.delete(path) return except Exception: pass # Recursively delete dir def _rm_tree(dir_path: str): try: ftp.cwd(dir_path) except Exception: return items = [] try: for name, facts in ftp.mlsd(): if name in (".", ".."): continue items.append((name, facts.get("type") == "dir")) except Exception: try: names = ftp.nlst() except Exception: names = [] for n in names: if n in (".", ".."): continue # Best-effort dir check try: ftp.cwd(_join_remote(dir_path, n)) ftp.cwd(dir_path) items.append((n, True)) except Exception: items.append((n, False)) for n, is_dir in items: child = _join_remote(dir_path, n) if is_dir: _rm_tree(child) else: try: ftp.delete(child) except Exception: pass try: ftp.rmd(dir_path) except Exception: pass _rm_tree(path) finally: try: ftp.quit() except Exception: pass await asyncio.to_thread(_do_delete) await LogService.info("adapter:ftp", f"Deleted {rel}", details={"adapter_id": self.record.id, "path": path}) async def move(self, root: str, src_rel: str, dst_rel: str): src = _join_remote(root, src_rel) dst = _join_remote(root, dst_rel) def _do_move(): ftp = self._connect() try: # Ensure dst parent exists parent = "/" if "/" not in dst.strip("/") else dst.rsplit("/", 1)[0] parts = [p for p in parent.strip("/").split("/") if p] cur = "/" for p in parts: cur = _join_remote(cur, p) try: ftp.mkd(cur) except Exception: pass ftp.rename(src, dst) finally: try: ftp.quit() except Exception: pass await asyncio.to_thread(_do_move) await LogService.info("adapter:ftp", f"Moved {src_rel} to {dst_rel}", details={"adapter_id": self.record.id, "src": src, "dst": dst}) async def rename(self, root: str, src_rel: str, dst_rel: str): await self.move(root, src_rel, dst_rel) async def copy(self, root: str, src_rel: str, dst_rel: str, overwrite: bool = False): src = _join_remote(root, src_rel) dst = _join_remote(root, dst_rel) # naive implementation: download then upload; recursively for dirs async def _is_dir(path: str) -> bool: def _probe() -> bool: ftp = self._connect() try: try: ftp.cwd(path) return True except Exception: return False finally: try: ftp.quit() except Exception: pass return await asyncio.to_thread(_probe) if await _is_dir(src): # list children, create dst dir, copy recursively await self.mkdir(root, dst_rel) children, _ = await self.list_dir(root, src_rel, page_num=1, page_size=10_000) for ent in children: child_src = f"{src_rel.rstrip('/')}/{ent['name']}" child_dst = f"{dst_rel.rstrip('/')}/{ent['name']}" await self.copy(root, child_src, child_dst, overwrite) await LogService.info( "adapter:ftp", f"Copied directory {src_rel} to {dst_rel}", details={"adapter_id": self.record.id, "src": src, "dst": dst} ) return # file data = await self.read_file(root, src_rel) if not overwrite: # best-effort existence check try: await self.stat_file(root, dst_rel) raise FileExistsError(dst_rel) except FileNotFoundError: pass await self.write_file(root, dst_rel, data) await LogService.info("adapter:ftp", f"Copied {src_rel} to {dst_rel}", details={"adapter_id": self.record.id, "src": src, "dst": dst}) async def stat_file(self, root: str, rel: str): path = _join_remote(root, rel) def _do_stat(): ftp = self._connect() try: # Try MLST try: resp: List[str] = [] ftp.retrlines("MLST " + path, resp.append) # The last line usually contains facts facts = {} if resp: facts = _parse_mlst_line(resp[-1]) name = rel.split("/")[-1] t = facts.get("type") or "file" is_dir = t == "dir" size = int(facts.get("size") or 0) mtime = _parse_modify_to_epoch(facts.get("modify") or "") return { "name": name, "is_dir": is_dir, "size": 0 if is_dir else size, "mtime": mtime, "type": "dir" if is_dir else "file", "path": path, } except Exception: pass # Probe directory try: ftp.cwd(path) return { "name": rel.split("/")[-1], "is_dir": True, "size": 0, "mtime": 0, "type": "dir", "path": path, } except Exception: pass # Treat as file try: size = ftp.size(path) or 0 except Exception: size = 0 try: mdtm = ftp.sendcmd("MDTM " + path) mtime = _parse_modify_to_epoch(mdtm.split(" ", 1)[1]) if mdtm.startswith("213 ") else 0 except Exception: mtime = 0 return { "name": rel.split("/")[-1], "is_dir": False, "size": int(size or 0), "mtime": int(mtime or 0), "type": "file", "path": path, } except error_perm as e: if str(e).startswith("550"): raise FileNotFoundError(rel) raise finally: try: ftp.quit() except Exception: pass return await asyncio.to_thread(_do_stat) async def stream_file(self, root: str, rel: str, range_header: str | None): path = _join_remote(root, rel) # Get size (best-effort) def _get_size() -> Optional[int]: ftp = self._connect() try: try: return int(ftp.size(path) or 0) except Exception: return None finally: try: ftp.quit() except Exception: pass total_size = await asyncio.to_thread(_get_size) mime, _ = mimetypes.guess_type(rel) content_type = mime or "application/octet-stream" rng: Optional[_Range] = None status = 200 headers = {"Accept-Ranges": "bytes", "Content-Type": content_type} if range_header and range_header.startswith("bytes=") and total_size is not None: try: s, e = (range_header.removeprefix("bytes=").split("-", 1)) start = int(s) if s.strip() else 0 end = int(e) if e.strip() else (total_size - 1) if start >= total_size: raise HTTPException(416, detail="Requested Range Not Satisfiable") if end >= total_size: end = total_size - 1 rng = _Range(start, end) status = 206 headers["Content-Range"] = f"bytes {start}-{end}/{total_size}" headers["Content-Length"] = str(end - start + 1) except ValueError: raise HTTPException(400, detail="Invalid Range header") elif total_size is not None: headers["Content-Length"] = str(total_size) queue: asyncio.Queue[Optional[bytes]] = asyncio.Queue(maxsize=8) class _Stop(Exception): pass def _worker(): ftp = self._connect() remaining = None if rng is not None: remaining = (rng.end - rng.start + 1) if rng.end is not None else None def _cb(chunk: bytes): nonlocal remaining if not chunk: return try: if remaining is not None: if len(chunk) > remaining: part = chunk[:remaining] queue.put_nowait(part) remaining = 0 raise _Stop() else: queue.put_nowait(chunk) remaining -= len(chunk) if remaining <= 0: raise _Stop() else: queue.put_nowait(chunk) except _Stop: raise except Exception: # queue full or event loop closed raise _Stop() try: if rng is not None: ftp.retrbinary("RETR " + path, _cb, rest=rng.start) else: ftp.retrbinary("RETR " + path, _cb) queue.put_nowait(None) except _Stop: try: queue.put_nowait(None) except Exception: pass except error_perm as e: try: queue.put_nowait(None) except Exception: pass if str(e).startswith("550"): pass finally: try: ftp.quit() except Exception: pass async def agen(): worker_fut = asyncio.to_thread(_worker) try: while True: chunk = await queue.get() if chunk is None: break yield chunk finally: try: await worker_fut except Exception: pass return StreamingResponse(agen(), status_code=status, headers=headers, media_type=content_type) ADAPTER_TYPE = "ftp" CONFIG_SCHEMA = [ {"key": "host", "label": "主机", "type": "string", "required": True, "placeholder": "ftp.example.com"}, {"key": "port", "label": "端口", "type": "number", "required": False, "default": 21}, {"key": "username", "label": "用户名", "type": "string", "required": False}, {"key": "password", "label": "密码", "type": "password", "required": False}, {"key": "passive", "label": "被动模式", "type": "boolean", "required": False, "default": True}, {"key": "timeout", "label": "超时(秒)", "type": "number", "required": False, "default": 15}, {"key": "root", "label": "根路径", "type": "string", "required": False, "default": "/"}, ] def ADAPTER_FACTORY(rec: StorageAdapter): return FTPAdapter(rec)