import os import platform from pathlib import Path from fastapi import APIRouter, HTTPException, BackgroundTasks from pydantic import BaseModel from typing import Optional from app.utils.response import ResponseWrapper as R from app.utils.logger import get_logger from app.utils.path_helper import get_model_dir from app.services.cookie_manager import CookieConfigManager from app.services.transcriber_config_manager import TranscriberConfigManager from ffmpeg_helper import ensure_ffmpeg_or_raise logger = get_logger(__name__) router = APIRouter() cookie_manager = CookieConfigManager() transcriber_config_manager = TranscriberConfigManager() class CookieUpdateRequest(BaseModel): platform: str cookie: str @router.get("/get_downloader_cookie/{platform}") def get_cookie(platform: str): cookie = cookie_manager.get(platform) if not cookie: return R.success(msg='未找到Cookies') return R.success( data={"platform": platform, "cookie": cookie} ) @router.post("/update_downloader_cookie") def update_cookie(data: CookieUpdateRequest): cookie_manager.set(data.platform, data.cookie) return R.success( ) class TranscriberConfigRequest(BaseModel): transcriber_type: str whisper_model_size: Optional[str] = None AVAILABLE_TRANSCRIBER_TYPES = [ {"value": "fast-whisper", "label": "Faster Whisper(本地)"}, {"value": "bcut", "label": "必剪(在线)"}, {"value": "kuaishou", "label": "快手(在线)"}, {"value": "groq", "label": "Groq(在线)"}, {"value": "mlx-whisper", "label": "MLX Whisper(仅macOS)"}, ] WHISPER_MODEL_SIZES = ["tiny", "base", "small", "medium", "large-v3", "large-v3-turbo"] @router.get("/transcriber_config") def get_transcriber_config(): from app.transcriber.transcriber_provider import MLX_WHISPER_AVAILABLE config = transcriber_config_manager.get_config() return R.success(data={ **config, "available_types": AVAILABLE_TRANSCRIBER_TYPES, "whisper_model_sizes": WHISPER_MODEL_SIZES, "mlx_whisper_available": MLX_WHISPER_AVAILABLE, }) @router.post("/transcriber_config") def update_transcriber_config(data: TranscriberConfigRequest): config = transcriber_config_manager.update_config( transcriber_type=data.transcriber_type, whisper_model_size=data.whisper_model_size, ) return R.success(data=config) # ---- Whisper 模型下载状态 & 下载触发 ---- # 用于跟踪正在进行的下载任务 _downloading: dict[str, str] = {} # model_size -> status ("downloading" | "done" | "failed") def _check_whisper_model_exists(model_size: str, subdir: str = "whisper") -> bool: """检查指定 whisper 模型是否已完整下载到本地。 只看目录会把"上次下载中断剩下的空目录"误判为已下载;以 model.bin 落盘为准。 """ model_dir = get_model_dir(subdir) model_path = Path(os.path.join(model_dir, f"whisper-{model_size}")) return (model_path / "model.bin").exists() @router.get("/transcriber_models_status") def get_transcriber_models_status(): """返回所有 whisper 模型的下载状态。""" statuses = [] for size in WHISPER_MODEL_SIZES: downloaded = _check_whisper_model_exists(size, "whisper") download_status = _downloading.get(size) statuses.append({ "model_size": size, "downloaded": downloaded, "downloading": download_status == "downloading", }) # 也检查 mlx-whisper(仅 macOS) mlx_available = platform.system() == "Darwin" mlx_statuses = [] if mlx_available: from app.transcriber.mlx_whisper_transcriber import MLX_MODEL_MAP for size in WHISPER_MODEL_SIZES: mlx_key = f"mlx-{size}" model_dir = get_model_dir("mlx-whisper") repo_id = MLX_MODEL_MAP.get(size) # 模型在本地按 repo_id(如 mlx-community/whisper-small-mlx)落盘 model_path = os.path.join(model_dir, repo_id) if repo_id else None downloaded = bool(model_path and Path(model_path).exists()) mlx_statuses.append({ "model_size": size, "downloaded": downloaded, "downloading": _downloading.get(mlx_key) == "downloading", "available": repo_id is not None, }) return R.success(data={ "whisper": statuses, "mlx_whisper": mlx_statuses, "mlx_available": mlx_available, }) class ModelDownloadRequest(BaseModel): model_size: str transcriber_type: str = "fast-whisper" # "fast-whisper" 或 "mlx-whisper" def _do_download_whisper(model_size: str): """后台下载 faster-whisper 模型。""" from app.transcriber.whisper import MODEL_MAP from modelscope import snapshot_download try: _downloading[model_size] = "downloading" model_dir = get_model_dir("whisper") model_path = os.path.join(model_dir, f"whisper-{model_size}") if Path(model_path).exists(): _downloading[model_size] = "done" return repo_id = MODEL_MAP.get(model_size) if not repo_id: _downloading[model_size] = "failed" return logger.info(f"开始下载 whisper 模型: {model_size}") snapshot_download(repo_id, local_dir=model_path) logger.info(f"whisper 模型下载完成: {model_size}") _downloading[model_size] = "done" except Exception as e: logger.error(f"whisper 模型下载失败: {model_size}, {e}") _downloading[model_size] = "failed" def _do_download_mlx_whisper(model_size: str): """后台下载 mlx-whisper 模型。""" key = f"mlx-{model_size}" try: _downloading[key] = "downloading" from huggingface_hub import snapshot_download as hf_download from app.transcriber.mlx_whisper_transcriber import resolve_mlx_repo_id try: repo_id = resolve_mlx_repo_id(model_size) except ValueError as e: logger.error(str(e)) _downloading[key] = "failed" return model_dir = get_model_dir("mlx-whisper") model_path = os.path.join(model_dir, repo_id) if Path(model_path).exists(): _downloading[key] = "done" return logger.info(f"开始下载 mlx-whisper 模型: {model_size} ← {repo_id}") hf_download(repo_id, local_dir=model_path, local_dir_use_symlinks=False) logger.info(f"mlx-whisper 模型下载完成: {model_size}") _downloading[key] = "done" except Exception as e: logger.error(f"mlx-whisper 模型下载失败: {model_size}, {e}") _downloading[key] = "failed" @router.post("/transcriber_download") def download_transcriber_model(data: ModelDownloadRequest, background_tasks: BackgroundTasks): """触发后台下载指定的 whisper 模型。""" if data.model_size not in WHISPER_MODEL_SIZES: return R.error(msg=f"不支持的模型大小: {data.model_size}") if data.transcriber_type == "mlx-whisper": if platform.system() != "Darwin": return R.error(msg="MLX Whisper 仅支持 macOS") key = f"mlx-{data.model_size}" if _downloading.get(key) == "downloading": return R.success(msg="模型正在下载中") background_tasks.add_task(_do_download_mlx_whisper, data.model_size) else: if _downloading.get(data.model_size) == "downloading": return R.success(msg="模型正在下载中") background_tasks.add_task(_do_download_whisper, data.model_size) return R.success(msg="模型下载已开始") @router.get("/sys_health") async def sys_health(): try: ensure_ffmpeg_or_raise() return R.success() except EnvironmentError: return R.error(msg="系统未安装 ffmpeg 请先进行安装") @router.get("/sys_check") async def sys_check(): return R.success() @router.get("/deploy_status") async def deploy_status(): """返回部署监控所需的所有状态信息。 每一项都做容错:torch 没装 / Whisper 配置读取失败 / FFmpeg 不可用 都不应让整个 endpoint 500 把监控页打死。 """ import os # CUDA 状态:torch 是 fast-whisper 路径才需要的依赖;轻量部署可能没装 cuda_info = {"available": False, "version": None, "gpu_name": None, "torch_installed": False} try: import torch cuda_info["torch_installed"] = True cuda_available = torch.cuda.is_available() cuda_info["available"] = cuda_available if cuda_available: cuda_info["version"] = torch.version.cuda try: cuda_info["gpu_name"] = torch.cuda.get_device_name(0) except Exception as e: logger.warning(f"读取 GPU 名称失败: {e}") except ImportError: # torch 未安装:保持 available=False;插件 / web 监控页能识别 pass except Exception as e: logger.warning(f"CUDA 状态检测失败: {e}") # Whisper 配置(任何失败回落到空,监控页不应被这个打死) model_size = None transcriber_type = None try: transcriber_cfg = transcriber_config_manager.get_config() model_size = transcriber_cfg.get("whisper_model_size") transcriber_type = transcriber_cfg.get("transcriber_type") except Exception as e: logger.warning(f"读取转写器配置失败: {e}") # FFmpeg 状态 try: ensure_ffmpeg_or_raise() ffmpeg_ok = True except Exception: ffmpeg_ok = False return R.success(data={ "backend": {"status": "running", "port": int(os.getenv("BACKEND_PORT", 8483))}, "cuda": cuda_info, "whisper": {"model_size": model_size, "transcriber_type": transcriber_type}, "ffmpeg": {"available": ffmpeg_ok}, })