From f4801d5be7238537dd84ba103dca605ac02d09c3 Mon Sep 17 00:00:00 2001 From: huangjianwu Date: Mon, 23 Mar 2026 17:31:30 +0800 Subject: [PATCH] =?UTF-8?q?feat(youtube):=20=E4=BD=BF=E7=94=A8=20youtube-t?= =?UTF-8?q?ranscript-api=20=E4=BC=98=E5=85=88=E8=8E=B7=E5=8F=96=E5=AD=97?= =?UTF-8?q?=E5=B9=95=EF=BC=8C=E6=9C=89=E5=AD=97=E5=B9=95=E6=97=B6=E8=B7=B3?= =?UTF-8?q?=E8=BF=87=E9=9F=B3=E9=A2=91=E4=B8=8B=E8=BD=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 YouTubeSubtitleFetcher 模块,通过 youtube-transcript-api 获取字幕 - 重构笔记生成流程:缓存 → 平台字幕 → 按需下载 → 转写 fallback - 有字幕时仅提取视频元信息,不下载音视频文件 - 添加 youtube-transcript-api 依赖 Co-Authored-By: Claude Opus 4.6 --- backend/app/downloaders/base.py | 3 +- backend/app/downloaders/youtube_downloader.py | 134 +++--------------- backend/app/downloaders/youtube_subtitle.py | 98 +++++++++++++ backend/app/services/note.py | 105 ++++++++++---- backend/requirements.txt | 1 + 5 files changed, 203 insertions(+), 138 deletions(-) create mode 100644 backend/app/downloaders/youtube_subtitle.py diff --git a/backend/app/downloaders/base.py b/backend/app/downloaders/base.py index a4dfb07..d1e71ad 100644 --- a/backend/app/downloaders/base.py +++ b/backend/app/downloaders/base.py @@ -22,7 +22,8 @@ class Downloader(ABC): @abstractmethod def download(self, video_url: str, output_dir: str = None, - quality: DownloadQuality = "fast", need_video: Optional[bool] = False) -> AudioDownloadResult: + quality: DownloadQuality = "fast", need_video: Optional[bool] = False, + skip_download: bool = False) -> AudioDownloadResult: ''' :param need_video: diff --git a/backend/app/downloaders/youtube_downloader.py b/backend/app/downloaders/youtube_downloader.py index 2a081bb..bb8ed8a 100644 --- a/backend/app/downloaders/youtube_downloader.py +++ b/backend/app/downloaders/youtube_downloader.py @@ -1,5 +1,4 @@ import os -import json import logging from abc import ABC from typing import Union, Optional, List @@ -7,8 +6,9 @@ from typing import Union, Optional, List import yt_dlp from app.downloaders.base import Downloader, DownloadQuality +from app.downloaders.youtube_subtitle import YouTubeSubtitleFetcher from app.models.notes_model import AudioDownloadResult -from app.models.transcriber_model import TranscriptResult, TranscriptSegment +from app.models.transcriber_model import TranscriptResult from app.utils.path_helper import get_data_dir from app.utils.url_parser import extract_video_id @@ -25,12 +25,13 @@ class YoutubeDownloader(Downloader, ABC): video_url: str, output_dir: Union[str, None] = None, quality: DownloadQuality = "fast", - need_video:Optional[bool]=False + need_video: Optional[bool] = False, + skip_download: bool = False, ) -> AudioDownloadResult: if output_dir is None: output_dir = get_data_dir() if not output_dir: - output_dir=self.cache_data + output_dir = self.cache_data os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, "%(id)s.%(ext)s") @@ -42,15 +43,17 @@ class YoutubeDownloader(Downloader, ABC): 'quiet': False, } + if skip_download: + ydl_opts['skip_download'] = True + with yt_dlp.YoutubeDL(ydl_opts) as ydl: - info = ydl.extract_info(video_url, download=True) + info = ydl.extract_info(video_url, download=not skip_download) video_id = info.get("id") title = info.get("title") duration = info.get("duration", 0) cover_url = info.get("thumbnail") - ext = info.get("ext", "m4a") # 兜底用 m4a + ext = info.get("ext", "m4a") audio_path = os.path.join(output_dir, f"{video_id}.{ext}") - print('os.path.join(output_dir, f"{video_id}.{ext}")',os.path.join(output_dir, f"{video_id}.{ext}")) return AudioDownloadResult( file_path=audio_path, @@ -59,8 +62,8 @@ class YoutubeDownloader(Downloader, ABC): cover_url=cover_url, platform="youtube", video_id=video_id, - raw_info={'tags':info.get('tags')}, #全部返回会报错 - video_path=None # ❗音频下载不包含视频路径 + raw_info={'tags': info.get('tags')}, + video_path=None, ) def download_video( @@ -101,115 +104,20 @@ class YoutubeDownloader(Downloader, ABC): def download_subtitles(self, video_url: str, output_dir: str = None, langs: List[str] = None) -> Optional[TranscriptResult]: """ - 尝试获取YouTube视频字幕(优先人工字幕,其次自动生成) + 通过 YouTube InnerTube API 直接获取字幕(优先人工字幕,其次自动生成)。 + 比 yt_dlp 方式更轻量,无需写临时文件到磁盘。 :param video_url: 视频链接 - :param output_dir: 输出路径 + :param output_dir: 未使用(保留接口兼容) :param langs: 优先语言列表 :return: TranscriptResult 或 None """ - if output_dir is None: - output_dir = get_data_dir() - if not output_dir: - output_dir = self.cache_data - os.makedirs(output_dir, exist_ok=True) - if langs is None: - langs = ['zh-Hans', 'zh', 'zh-CN', 'zh-TW', 'en', 'en-US'] + langs = ['zh-Hans', 'zh', 'zh-CN', 'zh-TW', 'en', 'en-US', 'ja'] video_id = extract_video_id(video_url, "youtube") - - ydl_opts = { - 'writesubtitles': True, - 'writeautomaticsub': True, - 'subtitleslangs': langs, - 'subtitlesformat': 'json3', - 'skip_download': True, - 'outtmpl': os.path.join(output_dir, f'{video_id}.%(ext)s'), - 'quiet': True, - } - - try: - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - info = ydl.extract_info(video_url, download=True) - - # 查找下载的字幕文件 - subtitles = info.get('requested_subtitles') or {} - if not subtitles: - logger.info(f"YouTube视频 {video_id} 没有可用字幕") - return None - - # 按优先级查找字幕文件 - subtitle_file = None - detected_lang = None - for lang in langs: - if lang in subtitles: - subtitle_file = os.path.join(output_dir, f"{video_id}.{lang}.json3") - detected_lang = lang - break - - # 如果按优先级没找到,取第一个可用的 - if not subtitle_file: - for lang, sub_info in subtitles.items(): - subtitle_file = os.path.join(output_dir, f"{video_id}.{lang}.json3") - detected_lang = lang - break - - if not subtitle_file or not os.path.exists(subtitle_file): - logger.info(f"字幕文件不存在: {subtitle_file}") - return None - - # 解析字幕文件 - return self._parse_json3_subtitle(subtitle_file, detected_lang) - - except Exception as e: - logger.warning(f"获取YouTube字幕失败: {e}") - return None - - def _parse_json3_subtitle(self, subtitle_file: str, language: str) -> Optional[TranscriptResult]: - """ - 解析 json3 格式字幕文件 - - :param subtitle_file: 字幕文件路径 - :param language: 语言代码 - :return: TranscriptResult - """ - try: - with open(subtitle_file, 'r', encoding='utf-8') as f: - data = json.load(f) - - segments = [] - events = data.get('events', []) - - for event in events: - # json3 格式中时间单位是毫秒 - start_ms = event.get('tStartMs', 0) - duration_ms = event.get('dDurationMs', 0) - - # 提取文本 - segs = event.get('segs', []) - text = ''.join(seg.get('utf8', '') for seg in segs).strip() - - if text: # 只添加非空文本 - segments.append(TranscriptSegment( - start=start_ms / 1000.0, - end=(start_ms + duration_ms) / 1000.0, - text=text - )) - - if not segments: - return None - - full_text = ' '.join(seg.text for seg in segments) - - logger.info(f"成功解析YouTube字幕,共 {len(segments)} 段") - return TranscriptResult( - language=language, - full_text=full_text, - segments=segments, - raw={'source': 'youtube_subtitle', 'file': subtitle_file} - ) - - except Exception as e: - logger.warning(f"解析字幕文件失败: {e}") - return None + fetcher = YouTubeSubtitleFetcher() + print( + f"尝试获取字幕,video_id={video_id}, langs={langs}" + ) + return fetcher.fetch_subtitles(video_id, langs) diff --git a/backend/app/downloaders/youtube_subtitle.py b/backend/app/downloaders/youtube_subtitle.py new file mode 100644 index 0000000..d59a871 --- /dev/null +++ b/backend/app/downloaders/youtube_subtitle.py @@ -0,0 +1,98 @@ +""" +通过 youtube-transcript-api 获取 YouTube 字幕。 +优先人工字幕,其次自动生成字幕。不依赖 yt_dlp,无需下载任何文件。 +""" + +from typing import Optional, List + +from youtube_transcript_api import YouTubeTranscriptApi + +from app.models.transcriber_model import TranscriptResult, TranscriptSegment +from app.utils.logger import get_logger + +logger = get_logger(__name__) + + +class YouTubeSubtitleFetcher: + """通过 youtube-transcript-api 获取 YouTube 字幕。""" + + def __init__(self): + self._api = YouTubeTranscriptApi() + + def fetch_subtitles( + self, + video_id: str, + langs: Optional[List[str]] = None, + ) -> Optional[TranscriptResult]: + if langs is None: + langs = ["zh-Hans", "zh", "zh-CN", "zh-TW", "en", "en-US", "ja"] + + try: + # 1. 列出所有可用字幕 + transcript_list = self._api.list(video_id) + + available = [] + for t in transcript_list: + available.append( + f"{t.language_code}({'auto' if t.is_generated else 'manual'})" + ) + logger.info(f"可用字幕轨道: {', '.join(available)}") + + # 2. 按优先级查找:先人工字幕,再自动字幕 + transcript = None + try: + transcript = transcript_list.find_manually_created_transcript(langs) + logger.info(f"选中人工字幕: {transcript.language_code} ({transcript.language})") + except Exception: + try: + transcript = transcript_list.find_generated_transcript(langs) + logger.info(f"选中自动字幕: {transcript.language_code} ({transcript.language})") + except Exception: + # 都没匹配,取第一个可用的 + for t in transcript_list: + transcript = t + source = "auto" if t.is_generated else "manual" + logger.info(f"使用首个可用字幕: {t.language_code} ({source})") + break + + if not transcript: + logger.info(f"YouTube 视频 {video_id} 没有任何可用字幕") + return None + + # 3. 获取字幕内容 + fetched = transcript.fetch() + segments = [] + for snippet in fetched: + text = snippet.get("text", "").strip() if isinstance(snippet, dict) else str(snippet).strip() + if not text: + continue + start = snippet.get("start", 0) if isinstance(snippet, dict) else 0 + duration = snippet.get("duration", 0) if isinstance(snippet, dict) else 0 + segments.append(TranscriptSegment( + start=float(start), + end=float(start) + float(duration), + text=text, + )) + + if not segments: + logger.warning(f"YouTube 字幕内容为空: {video_id}") + return None + + full_text = " ".join(seg.text for seg in segments) + logger.info(f"成功获取 YouTube 字幕,共 {len(segments)} 段") + + return TranscriptResult( + language=transcript.language_code, + full_text=full_text, + segments=segments, + raw={ + "source": "youtube_transcript_api", + "language": transcript.language, + "language_code": transcript.language_code, + "is_generated": transcript.is_generated, + }, + ) + + except Exception as e: + logger.warning(f"YouTube 字幕获取失败: {e}") + return None diff --git a/backend/app/services/note.py b/backend/app/services/note.py index 81d35c0..ebbe83a 100644 --- a/backend/app/services/note.py +++ b/backend/app/services/note.py @@ -133,8 +133,46 @@ class NoteGenerator: audio_cache_file = NOTE_OUTPUT_DIR / f"{task_id}_audio.json" transcript_cache_file = NOTE_OUTPUT_DIR / f"{task_id}_transcript.json" markdown_cache_file = NOTE_OUTPUT_DIR / f"{task_id}_markdown.md" - print(audio_cache_file) - # 1. 下载音频/视频 + # 1. 获取字幕/转写:优先缓存 → 平台字幕 → 音频转写 + transcript = None + + # 尝试读取缓存 + if transcript_cache_file.exists(): + logger.info(f"检测到转写缓存 ({transcript_cache_file}),尝试读取") + try: + data = json.loads(transcript_cache_file.read_text(encoding="utf-8")) + segments = [TranscriptSegment(**seg) for seg in data.get("segments", [])] + transcript = TranscriptResult( + language=data.get("language"), + full_text=data["full_text"], + segments=segments, + ) + logger.info(f"已从缓存加载转写结果,共 {len(segments)} 段") + except Exception as e: + logger.warning(f"加载转写缓存失败: {e}") + + # 缓存没有,尝试获取平台字幕 + if transcript is None: + logger.info("尝试获取平台字幕(优先于音频下载)...") + try: + transcript = downloader.download_subtitles(video_url) + if transcript and transcript.segments: + logger.info(f"成功获取平台字幕,共 {len(transcript.segments)} 段") + transcript_cache_file.write_text( + json.dumps(asdict(transcript), ensure_ascii=False, indent=2), + encoding="utf-8", + ) + else: + transcript = None + logger.info("平台无可用字幕,将下载音频后转写") + except Exception as e: + logger.warning(f"获取平台字幕失败: {e},将下载音频后转写") + transcript = None + + # 2. 下载音频/视频 + # 有字幕时只提取元信息,不下载音视频文件(除非需要截图/视频理解) + has_transcript = transcript is not None + need_full_download = not has_transcript or screenshot or video_understanding audio_meta = self._download_media( downloader=downloader, video_url=video_url, @@ -147,18 +185,19 @@ class NoteGenerator: video_understanding=video_understanding, video_interval=video_interval, grid_size=grid_size, + skip_download=not need_full_download, ) - # 2. 获取字幕/转写文字 - # 优先尝试获取平台字幕,没有再 fallback 到音频转写 - transcript = self._get_transcript( - downloader=downloader, - video_url=video_url, - audio_file=audio_meta.file_path, - transcript_cache_file=transcript_cache_file, - status_phase=TaskStatus.TRANSCRIBING, - task_id=task_id, - ) + # 3. 如果前面没拿到字幕,走转写流程 + if transcript is None: + transcript = self._get_transcript( + downloader=downloader, + video_url=video_url, + audio_file=audio_meta.file_path, + transcript_cache_file=transcript_cache_file, + status_phase=TaskStatus.TRANSCRIBING, + task_id=task_id, + ) # 3. GPT 总结 markdown = self._summarize_text( @@ -331,6 +370,7 @@ class NoteGenerator: video_understanding: bool, video_interval: int, grid_size: List[int], + skip_download: bool = False, ) -> AudioDownloadResult | None: """ 1. 检查音频缓存;若不存在,则根据需要下载音频或视频(若需截图/可视化)。 @@ -353,7 +393,34 @@ class NoteGenerator: task_id = audio_cache_file.stem.split("_")[0] self._update_status(task_id, status_phase) + # 已有缓存,尝试加载 + if audio_cache_file.exists(): + logger.info(f"检测到音频缓存 ({audio_cache_file}),直接读取") + try: + data = json.loads(audio_cache_file.read_text(encoding="utf-8")) + return AudioDownloadResult(**data) + except Exception as e: + logger.warning(f"读取音频缓存失败,将重新下载:{e}") + # 有字幕且不需要截图/视频理解时,只提取元信息不下载文件 + if skip_download: + logger.info("已有字幕,仅提取视频元信息(不下载音视频)") + try: + audio = downloader.download( + video_url=video_url, + quality=quality, + output_dir=output_path, + need_video=False, + skip_download=True, + ) + audio_cache_file.write_text( + json.dumps(asdict(audio), ensure_ascii=False, indent=2), + encoding="utf-8", + ) + logger.info(f"元信息提取完成 ({audio_cache_file})") + return audio + except Exception as exc: + logger.warning(f"元信息提取失败,将尝试完整下载: {exc}") # 判断是否需要下载视频 need_video = screenshot or video_understanding @@ -368,9 +435,8 @@ class NoteGenerator: self.video_path = Path(video_path_str) logger.info(f"视频下载完成:{self.video_path}") - # 若指定了 grid_size,则生成缩略图 if grid_size: - self.video_img_urls=VideoReader( + self.video_img_urls = VideoReader( video_path=str(self.video_path), grid_size=tuple(grid_size), frame_interval=frame_interval, @@ -382,17 +448,9 @@ class NoteGenerator: logger.info("未指定 grid_size,跳过缩略图生成") except Exception as exc: logger.error(f"视频下载失败:{exc}") - self._handle_exception(task_id, exc) raise - # 已有缓存,尝试加载 - if audio_cache_file.exists(): - logger.info(f"检测到音频缓存 ({audio_cache_file}),直接读取") - try: - data = json.loads(audio_cache_file.read_text(encoding="utf-8")) - return AudioDownloadResult(**data) - except Exception as e: - logger.warning(f"读取音频缓存失败,将重新下载:{e}") + # 下载音频 try: logger.info("开始下载音频") @@ -402,7 +460,6 @@ class NoteGenerator: output_dir=output_path, need_video=need_video, ) - # 缓存 audio 元信息到本地 JSON audio_cache_file.write_text(json.dumps(asdict(audio), ensure_ascii=False, indent=2), encoding="utf-8") logger.info(f"音频下载并缓存成功 ({audio_cache_file})") return audio diff --git a/backend/requirements.txt b/backend/requirements.txt index b3afa64..b29b0ab 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -124,5 +124,6 @@ weasyprint==65.1 webencodings==0.5.1 websockets==15.0.1 yarl==1.19.0 +youtube-transcript-api>=1.0.0 yt-dlp==2025.3.31 zopfli==0.2.3.post1