Merge pull request #301 from JefferyHcool/feature/youtube-subtitle-innertube

feat(youtube): 使用 youtube-transcript-api 优先获取字幕,有字幕时跳过音频下载
This commit is contained in:
Jianwu Huang
2026-03-23 17:32:48 +08:00
committed by GitHub
5 changed files with 203 additions and 138 deletions

View File

@@ -22,7 +22,8 @@ class Downloader(ABC):
@abstractmethod
def download(self, video_url: str, output_dir: str = None,
quality: DownloadQuality = "fast", need_video: Optional[bool] = False) -> AudioDownloadResult:
quality: DownloadQuality = "fast", need_video: Optional[bool] = False,
skip_download: bool = False) -> AudioDownloadResult:
'''
:param need_video:

View File

@@ -1,5 +1,4 @@
import os
import json
import logging
from abc import ABC
from typing import Union, Optional, List
@@ -7,8 +6,9 @@ from typing import Union, Optional, List
import yt_dlp
from app.downloaders.base import Downloader, DownloadQuality
from app.downloaders.youtube_subtitle import YouTubeSubtitleFetcher
from app.models.notes_model import AudioDownloadResult
from app.models.transcriber_model import TranscriptResult, TranscriptSegment
from app.models.transcriber_model import TranscriptResult
from app.utils.path_helper import get_data_dir
from app.utils.url_parser import extract_video_id
@@ -25,12 +25,13 @@ class YoutubeDownloader(Downloader, ABC):
video_url: str,
output_dir: Union[str, None] = None,
quality: DownloadQuality = "fast",
need_video:Optional[bool]=False
need_video: Optional[bool] = False,
skip_download: bool = False,
) -> AudioDownloadResult:
if output_dir is None:
output_dir = get_data_dir()
if not output_dir:
output_dir=self.cache_data
output_dir = self.cache_data
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "%(id)s.%(ext)s")
@@ -42,15 +43,17 @@ class YoutubeDownloader(Downloader, ABC):
'quiet': False,
}
if skip_download:
ydl_opts['skip_download'] = True
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
info = ydl.extract_info(video_url, download=not skip_download)
video_id = info.get("id")
title = info.get("title")
duration = info.get("duration", 0)
cover_url = info.get("thumbnail")
ext = info.get("ext", "m4a") # 兜底用 m4a
ext = info.get("ext", "m4a")
audio_path = os.path.join(output_dir, f"{video_id}.{ext}")
print('os.path.join(output_dir, f"{video_id}.{ext}")',os.path.join(output_dir, f"{video_id}.{ext}"))
return AudioDownloadResult(
file_path=audio_path,
@@ -59,8 +62,8 @@ class YoutubeDownloader(Downloader, ABC):
cover_url=cover_url,
platform="youtube",
video_id=video_id,
raw_info={'tags':info.get('tags')}, #全部返回会报错
video_path=None # ❗音频下载不包含视频路径
raw_info={'tags': info.get('tags')},
video_path=None,
)
def download_video(
@@ -101,115 +104,20 @@ class YoutubeDownloader(Downloader, ABC):
def download_subtitles(self, video_url: str, output_dir: str = None,
langs: List[str] = None) -> Optional[TranscriptResult]:
"""
尝试获取YouTube视频字幕(优先人工字幕,其次自动生成)
通过 YouTube InnerTube API 直接获取字幕(优先人工字幕,其次自动生成)
比 yt_dlp 方式更轻量,无需写临时文件到磁盘。
:param video_url: 视频链接
:param output_dir: 输出路径
:param output_dir: 未使用(保留接口兼容)
:param langs: 优先语言列表
:return: TranscriptResult 或 None
"""
if output_dir is None:
output_dir = get_data_dir()
if not output_dir:
output_dir = self.cache_data
os.makedirs(output_dir, exist_ok=True)
if langs is None:
langs = ['zh-Hans', 'zh', 'zh-CN', 'zh-TW', 'en', 'en-US']
langs = ['zh-Hans', 'zh', 'zh-CN', 'zh-TW', 'en', 'en-US', 'ja']
video_id = extract_video_id(video_url, "youtube")
ydl_opts = {
'writesubtitles': True,
'writeautomaticsub': True,
'subtitleslangs': langs,
'subtitlesformat': 'json3',
'skip_download': True,
'outtmpl': os.path.join(output_dir, f'{video_id}.%(ext)s'),
'quiet': True,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
# 查找下载的字幕文件
subtitles = info.get('requested_subtitles') or {}
if not subtitles:
logger.info(f"YouTube视频 {video_id} 没有可用字幕")
return None
# 按优先级查找字幕文件
subtitle_file = None
detected_lang = None
for lang in langs:
if lang in subtitles:
subtitle_file = os.path.join(output_dir, f"{video_id}.{lang}.json3")
detected_lang = lang
break
# 如果按优先级没找到,取第一个可用的
if not subtitle_file:
for lang, sub_info in subtitles.items():
subtitle_file = os.path.join(output_dir, f"{video_id}.{lang}.json3")
detected_lang = lang
break
if not subtitle_file or not os.path.exists(subtitle_file):
logger.info(f"字幕文件不存在: {subtitle_file}")
return None
# 解析字幕文件
return self._parse_json3_subtitle(subtitle_file, detected_lang)
except Exception as e:
logger.warning(f"获取YouTube字幕失败: {e}")
return None
def _parse_json3_subtitle(self, subtitle_file: str, language: str) -> Optional[TranscriptResult]:
"""
解析 json3 格式字幕文件
:param subtitle_file: 字幕文件路径
:param language: 语言代码
:return: TranscriptResult
"""
try:
with open(subtitle_file, 'r', encoding='utf-8') as f:
data = json.load(f)
segments = []
events = data.get('events', [])
for event in events:
# json3 格式中时间单位是毫秒
start_ms = event.get('tStartMs', 0)
duration_ms = event.get('dDurationMs', 0)
# 提取文本
segs = event.get('segs', [])
text = ''.join(seg.get('utf8', '') for seg in segs).strip()
if text: # 只添加非空文本
segments.append(TranscriptSegment(
start=start_ms / 1000.0,
end=(start_ms + duration_ms) / 1000.0,
text=text
))
if not segments:
return None
full_text = ' '.join(seg.text for seg in segments)
logger.info(f"成功解析YouTube字幕{len(segments)}")
return TranscriptResult(
language=language,
full_text=full_text,
segments=segments,
raw={'source': 'youtube_subtitle', 'file': subtitle_file}
)
except Exception as e:
logger.warning(f"解析字幕文件失败: {e}")
return None
fetcher = YouTubeSubtitleFetcher()
print(
f"尝试获取字幕video_id={video_id}, langs={langs}"
)
return fetcher.fetch_subtitles(video_id, langs)

View File

@@ -0,0 +1,98 @@
"""
通过 youtube-transcript-api 获取 YouTube 字幕。
优先人工字幕,其次自动生成字幕。不依赖 yt_dlp无需下载任何文件。
"""
from typing import Optional, List
from youtube_transcript_api import YouTubeTranscriptApi
from app.models.transcriber_model import TranscriptResult, TranscriptSegment
from app.utils.logger import get_logger
logger = get_logger(__name__)
class YouTubeSubtitleFetcher:
"""通过 youtube-transcript-api 获取 YouTube 字幕。"""
def __init__(self):
self._api = YouTubeTranscriptApi()
def fetch_subtitles(
self,
video_id: str,
langs: Optional[List[str]] = None,
) -> Optional[TranscriptResult]:
if langs is None:
langs = ["zh-Hans", "zh", "zh-CN", "zh-TW", "en", "en-US", "ja"]
try:
# 1. 列出所有可用字幕
transcript_list = self._api.list(video_id)
available = []
for t in transcript_list:
available.append(
f"{t.language_code}({'auto' if t.is_generated else 'manual'})"
)
logger.info(f"可用字幕轨道: {', '.join(available)}")
# 2. 按优先级查找:先人工字幕,再自动字幕
transcript = None
try:
transcript = transcript_list.find_manually_created_transcript(langs)
logger.info(f"选中人工字幕: {transcript.language_code} ({transcript.language})")
except Exception:
try:
transcript = transcript_list.find_generated_transcript(langs)
logger.info(f"选中自动字幕: {transcript.language_code} ({transcript.language})")
except Exception:
# 都没匹配,取第一个可用的
for t in transcript_list:
transcript = t
source = "auto" if t.is_generated else "manual"
logger.info(f"使用首个可用字幕: {t.language_code} ({source})")
break
if not transcript:
logger.info(f"YouTube 视频 {video_id} 没有任何可用字幕")
return None
# 3. 获取字幕内容
fetched = transcript.fetch()
segments = []
for snippet in fetched:
text = snippet.get("text", "").strip() if isinstance(snippet, dict) else str(snippet).strip()
if not text:
continue
start = snippet.get("start", 0) if isinstance(snippet, dict) else 0
duration = snippet.get("duration", 0) if isinstance(snippet, dict) else 0
segments.append(TranscriptSegment(
start=float(start),
end=float(start) + float(duration),
text=text,
))
if not segments:
logger.warning(f"YouTube 字幕内容为空: {video_id}")
return None
full_text = " ".join(seg.text for seg in segments)
logger.info(f"成功获取 YouTube 字幕,共 {len(segments)}")
return TranscriptResult(
language=transcript.language_code,
full_text=full_text,
segments=segments,
raw={
"source": "youtube_transcript_api",
"language": transcript.language,
"language_code": transcript.language_code,
"is_generated": transcript.is_generated,
},
)
except Exception as e:
logger.warning(f"YouTube 字幕获取失败: {e}")
return None

View File

@@ -133,8 +133,46 @@ class NoteGenerator:
audio_cache_file = NOTE_OUTPUT_DIR / f"{task_id}_audio.json"
transcript_cache_file = NOTE_OUTPUT_DIR / f"{task_id}_transcript.json"
markdown_cache_file = NOTE_OUTPUT_DIR / f"{task_id}_markdown.md"
print(audio_cache_file)
# 1. 下载音频/视频
# 1. 获取字幕/转写:优先缓存 → 平台字幕 → 音频转写
transcript = None
# 尝试读取缓存
if transcript_cache_file.exists():
logger.info(f"检测到转写缓存 ({transcript_cache_file}),尝试读取")
try:
data = json.loads(transcript_cache_file.read_text(encoding="utf-8"))
segments = [TranscriptSegment(**seg) for seg in data.get("segments", [])]
transcript = TranscriptResult(
language=data.get("language"),
full_text=data["full_text"],
segments=segments,
)
logger.info(f"已从缓存加载转写结果,共 {len(segments)}")
except Exception as e:
logger.warning(f"加载转写缓存失败: {e}")
# 缓存没有,尝试获取平台字幕
if transcript is None:
logger.info("尝试获取平台字幕(优先于音频下载)...")
try:
transcript = downloader.download_subtitles(video_url)
if transcript and transcript.segments:
logger.info(f"成功获取平台字幕,共 {len(transcript.segments)}")
transcript_cache_file.write_text(
json.dumps(asdict(transcript), ensure_ascii=False, indent=2),
encoding="utf-8",
)
else:
transcript = None
logger.info("平台无可用字幕,将下载音频后转写")
except Exception as e:
logger.warning(f"获取平台字幕失败: {e},将下载音频后转写")
transcript = None
# 2. 下载音频/视频
# 有字幕时只提取元信息,不下载音视频文件(除非需要截图/视频理解)
has_transcript = transcript is not None
need_full_download = not has_transcript or screenshot or video_understanding
audio_meta = self._download_media(
downloader=downloader,
video_url=video_url,
@@ -147,18 +185,19 @@ class NoteGenerator:
video_understanding=video_understanding,
video_interval=video_interval,
grid_size=grid_size,
skip_download=not need_full_download,
)
# 2. 获取字幕/转写文字
# 优先尝试获取平台字幕,没有再 fallback 到音频转写
transcript = self._get_transcript(
downloader=downloader,
video_url=video_url,
audio_file=audio_meta.file_path,
transcript_cache_file=transcript_cache_file,
status_phase=TaskStatus.TRANSCRIBING,
task_id=task_id,
)
# 3. 如果前面没拿到字幕,走转写流程
if transcript is None:
transcript = self._get_transcript(
downloader=downloader,
video_url=video_url,
audio_file=audio_meta.file_path,
transcript_cache_file=transcript_cache_file,
status_phase=TaskStatus.TRANSCRIBING,
task_id=task_id,
)
# 3. GPT 总结
markdown = self._summarize_text(
@@ -331,6 +370,7 @@ class NoteGenerator:
video_understanding: bool,
video_interval: int,
grid_size: List[int],
skip_download: bool = False,
) -> AudioDownloadResult | None:
"""
1. 检查音频缓存;若不存在,则根据需要下载音频或视频(若需截图/可视化)。
@@ -353,7 +393,34 @@ class NoteGenerator:
task_id = audio_cache_file.stem.split("_")[0]
self._update_status(task_id, status_phase)
# 已有缓存,尝试加载
if audio_cache_file.exists():
logger.info(f"检测到音频缓存 ({audio_cache_file}),直接读取")
try:
data = json.loads(audio_cache_file.read_text(encoding="utf-8"))
return AudioDownloadResult(**data)
except Exception as e:
logger.warning(f"读取音频缓存失败,将重新下载:{e}")
# 有字幕且不需要截图/视频理解时,只提取元信息不下载文件
if skip_download:
logger.info("已有字幕,仅提取视频元信息(不下载音视频)")
try:
audio = downloader.download(
video_url=video_url,
quality=quality,
output_dir=output_path,
need_video=False,
skip_download=True,
)
audio_cache_file.write_text(
json.dumps(asdict(audio), ensure_ascii=False, indent=2),
encoding="utf-8",
)
logger.info(f"元信息提取完成 ({audio_cache_file})")
return audio
except Exception as exc:
logger.warning(f"元信息提取失败,将尝试完整下载: {exc}")
# 判断是否需要下载视频
need_video = screenshot or video_understanding
@@ -368,9 +435,8 @@ class NoteGenerator:
self.video_path = Path(video_path_str)
logger.info(f"视频下载完成:{self.video_path}")
# 若指定了 grid_size则生成缩略图
if grid_size:
self.video_img_urls=VideoReader(
self.video_img_urls = VideoReader(
video_path=str(self.video_path),
grid_size=tuple(grid_size),
frame_interval=frame_interval,
@@ -382,17 +448,9 @@ class NoteGenerator:
logger.info("未指定 grid_size跳过缩略图生成")
except Exception as exc:
logger.error(f"视频下载失败:{exc}")
self._handle_exception(task_id, exc)
raise
# 已有缓存,尝试加载
if audio_cache_file.exists():
logger.info(f"检测到音频缓存 ({audio_cache_file}),直接读取")
try:
data = json.loads(audio_cache_file.read_text(encoding="utf-8"))
return AudioDownloadResult(**data)
except Exception as e:
logger.warning(f"读取音频缓存失败,将重新下载:{e}")
# 下载音频
try:
logger.info("开始下载音频")
@@ -402,7 +460,6 @@ class NoteGenerator:
output_dir=output_path,
need_video=need_video,
)
# 缓存 audio 元信息到本地 JSON
audio_cache_file.write_text(json.dumps(asdict(audio), ensure_ascii=False, indent=2), encoding="utf-8")
logger.info(f"音频下载并缓存成功 ({audio_cache_file})")
return audio

View File

@@ -124,5 +124,6 @@ weasyprint==65.1
webencodings==0.5.1
websockets==15.0.1
yarl==1.19.0
youtube-transcript-api>=1.0.0
yt-dlp==2025.3.31
zopfli==0.2.3.post1