mirror of
https://github.com/JefferyHcool/BiliNote.git
synced 2026-05-06 20:42:52 +08:00
feat(youtube): 使用 youtube-transcript-api 优先获取字幕,有字幕时跳过音频下载
- 新增 YouTubeSubtitleFetcher 模块,通过 youtube-transcript-api 获取字幕 - 重构笔记生成流程:缓存 → 平台字幕 → 按需下载 → 转写 fallback - 有字幕时仅提取视频元信息,不下载音视频文件 - 添加 youtube-transcript-api 依赖 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -22,7 +22,8 @@ class Downloader(ABC):
|
||||
|
||||
@abstractmethod
|
||||
def download(self, video_url: str, output_dir: str = None,
|
||||
quality: DownloadQuality = "fast", need_video: Optional[bool] = False) -> AudioDownloadResult:
|
||||
quality: DownloadQuality = "fast", need_video: Optional[bool] = False,
|
||||
skip_download: bool = False) -> AudioDownloadResult:
|
||||
'''
|
||||
|
||||
:param need_video:
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
import json
|
||||
import logging
|
||||
from abc import ABC
|
||||
from typing import Union, Optional, List
|
||||
@@ -7,8 +6,9 @@ from typing import Union, Optional, List
|
||||
import yt_dlp
|
||||
|
||||
from app.downloaders.base import Downloader, DownloadQuality
|
||||
from app.downloaders.youtube_subtitle import YouTubeSubtitleFetcher
|
||||
from app.models.notes_model import AudioDownloadResult
|
||||
from app.models.transcriber_model import TranscriptResult, TranscriptSegment
|
||||
from app.models.transcriber_model import TranscriptResult
|
||||
from app.utils.path_helper import get_data_dir
|
||||
from app.utils.url_parser import extract_video_id
|
||||
|
||||
@@ -25,12 +25,13 @@ class YoutubeDownloader(Downloader, ABC):
|
||||
video_url: str,
|
||||
output_dir: Union[str, None] = None,
|
||||
quality: DownloadQuality = "fast",
|
||||
need_video:Optional[bool]=False
|
||||
need_video: Optional[bool] = False,
|
||||
skip_download: bool = False,
|
||||
) -> AudioDownloadResult:
|
||||
if output_dir is None:
|
||||
output_dir = get_data_dir()
|
||||
if not output_dir:
|
||||
output_dir=self.cache_data
|
||||
output_dir = self.cache_data
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
output_path = os.path.join(output_dir, "%(id)s.%(ext)s")
|
||||
@@ -42,15 +43,17 @@ class YoutubeDownloader(Downloader, ABC):
|
||||
'quiet': False,
|
||||
}
|
||||
|
||||
if skip_download:
|
||||
ydl_opts['skip_download'] = True
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(video_url, download=True)
|
||||
info = ydl.extract_info(video_url, download=not skip_download)
|
||||
video_id = info.get("id")
|
||||
title = info.get("title")
|
||||
duration = info.get("duration", 0)
|
||||
cover_url = info.get("thumbnail")
|
||||
ext = info.get("ext", "m4a") # 兜底用 m4a
|
||||
ext = info.get("ext", "m4a")
|
||||
audio_path = os.path.join(output_dir, f"{video_id}.{ext}")
|
||||
print('os.path.join(output_dir, f"{video_id}.{ext}")',os.path.join(output_dir, f"{video_id}.{ext}"))
|
||||
|
||||
return AudioDownloadResult(
|
||||
file_path=audio_path,
|
||||
@@ -59,8 +62,8 @@ class YoutubeDownloader(Downloader, ABC):
|
||||
cover_url=cover_url,
|
||||
platform="youtube",
|
||||
video_id=video_id,
|
||||
raw_info={'tags':info.get('tags')}, #全部返回会报错
|
||||
video_path=None # ❗音频下载不包含视频路径
|
||||
raw_info={'tags': info.get('tags')},
|
||||
video_path=None,
|
||||
)
|
||||
|
||||
def download_video(
|
||||
@@ -101,115 +104,20 @@ class YoutubeDownloader(Downloader, ABC):
|
||||
def download_subtitles(self, video_url: str, output_dir: str = None,
|
||||
langs: List[str] = None) -> Optional[TranscriptResult]:
|
||||
"""
|
||||
尝试获取YouTube视频字幕(优先人工字幕,其次自动生成)
|
||||
通过 YouTube InnerTube API 直接获取字幕(优先人工字幕,其次自动生成)。
|
||||
比 yt_dlp 方式更轻量,无需写临时文件到磁盘。
|
||||
|
||||
:param video_url: 视频链接
|
||||
:param output_dir: 输出路径
|
||||
:param output_dir: 未使用(保留接口兼容)
|
||||
:param langs: 优先语言列表
|
||||
:return: TranscriptResult 或 None
|
||||
"""
|
||||
if output_dir is None:
|
||||
output_dir = get_data_dir()
|
||||
if not output_dir:
|
||||
output_dir = self.cache_data
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
|
||||
if langs is None:
|
||||
langs = ['zh-Hans', 'zh', 'zh-CN', 'zh-TW', 'en', 'en-US']
|
||||
langs = ['zh-Hans', 'zh', 'zh-CN', 'zh-TW', 'en', 'en-US', 'ja']
|
||||
|
||||
video_id = extract_video_id(video_url, "youtube")
|
||||
|
||||
ydl_opts = {
|
||||
'writesubtitles': True,
|
||||
'writeautomaticsub': True,
|
||||
'subtitleslangs': langs,
|
||||
'subtitlesformat': 'json3',
|
||||
'skip_download': True,
|
||||
'outtmpl': os.path.join(output_dir, f'{video_id}.%(ext)s'),
|
||||
'quiet': True,
|
||||
}
|
||||
|
||||
try:
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(video_url, download=True)
|
||||
|
||||
# 查找下载的字幕文件
|
||||
subtitles = info.get('requested_subtitles') or {}
|
||||
if not subtitles:
|
||||
logger.info(f"YouTube视频 {video_id} 没有可用字幕")
|
||||
return None
|
||||
|
||||
# 按优先级查找字幕文件
|
||||
subtitle_file = None
|
||||
detected_lang = None
|
||||
for lang in langs:
|
||||
if lang in subtitles:
|
||||
subtitle_file = os.path.join(output_dir, f"{video_id}.{lang}.json3")
|
||||
detected_lang = lang
|
||||
break
|
||||
|
||||
# 如果按优先级没找到,取第一个可用的
|
||||
if not subtitle_file:
|
||||
for lang, sub_info in subtitles.items():
|
||||
subtitle_file = os.path.join(output_dir, f"{video_id}.{lang}.json3")
|
||||
detected_lang = lang
|
||||
break
|
||||
|
||||
if not subtitle_file or not os.path.exists(subtitle_file):
|
||||
logger.info(f"字幕文件不存在: {subtitle_file}")
|
||||
return None
|
||||
|
||||
# 解析字幕文件
|
||||
return self._parse_json3_subtitle(subtitle_file, detected_lang)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"获取YouTube字幕失败: {e}")
|
||||
return None
|
||||
|
||||
def _parse_json3_subtitle(self, subtitle_file: str, language: str) -> Optional[TranscriptResult]:
|
||||
"""
|
||||
解析 json3 格式字幕文件
|
||||
|
||||
:param subtitle_file: 字幕文件路径
|
||||
:param language: 语言代码
|
||||
:return: TranscriptResult
|
||||
"""
|
||||
try:
|
||||
with open(subtitle_file, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
segments = []
|
||||
events = data.get('events', [])
|
||||
|
||||
for event in events:
|
||||
# json3 格式中时间单位是毫秒
|
||||
start_ms = event.get('tStartMs', 0)
|
||||
duration_ms = event.get('dDurationMs', 0)
|
||||
|
||||
# 提取文本
|
||||
segs = event.get('segs', [])
|
||||
text = ''.join(seg.get('utf8', '') for seg in segs).strip()
|
||||
|
||||
if text: # 只添加非空文本
|
||||
segments.append(TranscriptSegment(
|
||||
start=start_ms / 1000.0,
|
||||
end=(start_ms + duration_ms) / 1000.0,
|
||||
text=text
|
||||
))
|
||||
|
||||
if not segments:
|
||||
return None
|
||||
|
||||
full_text = ' '.join(seg.text for seg in segments)
|
||||
|
||||
logger.info(f"成功解析YouTube字幕,共 {len(segments)} 段")
|
||||
return TranscriptResult(
|
||||
language=language,
|
||||
full_text=full_text,
|
||||
segments=segments,
|
||||
raw={'source': 'youtube_subtitle', 'file': subtitle_file}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"解析字幕文件失败: {e}")
|
||||
return None
|
||||
fetcher = YouTubeSubtitleFetcher()
|
||||
print(
|
||||
f"尝试获取字幕,video_id={video_id}, langs={langs}"
|
||||
)
|
||||
return fetcher.fetch_subtitles(video_id, langs)
|
||||
|
||||
98
backend/app/downloaders/youtube_subtitle.py
Normal file
98
backend/app/downloaders/youtube_subtitle.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""
|
||||
通过 youtube-transcript-api 获取 YouTube 字幕。
|
||||
优先人工字幕,其次自动生成字幕。不依赖 yt_dlp,无需下载任何文件。
|
||||
"""
|
||||
|
||||
from typing import Optional, List
|
||||
|
||||
from youtube_transcript_api import YouTubeTranscriptApi
|
||||
|
||||
from app.models.transcriber_model import TranscriptResult, TranscriptSegment
|
||||
from app.utils.logger import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class YouTubeSubtitleFetcher:
|
||||
"""通过 youtube-transcript-api 获取 YouTube 字幕。"""
|
||||
|
||||
def __init__(self):
|
||||
self._api = YouTubeTranscriptApi()
|
||||
|
||||
def fetch_subtitles(
|
||||
self,
|
||||
video_id: str,
|
||||
langs: Optional[List[str]] = None,
|
||||
) -> Optional[TranscriptResult]:
|
||||
if langs is None:
|
||||
langs = ["zh-Hans", "zh", "zh-CN", "zh-TW", "en", "en-US", "ja"]
|
||||
|
||||
try:
|
||||
# 1. 列出所有可用字幕
|
||||
transcript_list = self._api.list(video_id)
|
||||
|
||||
available = []
|
||||
for t in transcript_list:
|
||||
available.append(
|
||||
f"{t.language_code}({'auto' if t.is_generated else 'manual'})"
|
||||
)
|
||||
logger.info(f"可用字幕轨道: {', '.join(available)}")
|
||||
|
||||
# 2. 按优先级查找:先人工字幕,再自动字幕
|
||||
transcript = None
|
||||
try:
|
||||
transcript = transcript_list.find_manually_created_transcript(langs)
|
||||
logger.info(f"选中人工字幕: {transcript.language_code} ({transcript.language})")
|
||||
except Exception:
|
||||
try:
|
||||
transcript = transcript_list.find_generated_transcript(langs)
|
||||
logger.info(f"选中自动字幕: {transcript.language_code} ({transcript.language})")
|
||||
except Exception:
|
||||
# 都没匹配,取第一个可用的
|
||||
for t in transcript_list:
|
||||
transcript = t
|
||||
source = "auto" if t.is_generated else "manual"
|
||||
logger.info(f"使用首个可用字幕: {t.language_code} ({source})")
|
||||
break
|
||||
|
||||
if not transcript:
|
||||
logger.info(f"YouTube 视频 {video_id} 没有任何可用字幕")
|
||||
return None
|
||||
|
||||
# 3. 获取字幕内容
|
||||
fetched = transcript.fetch()
|
||||
segments = []
|
||||
for snippet in fetched:
|
||||
text = snippet.get("text", "").strip() if isinstance(snippet, dict) else str(snippet).strip()
|
||||
if not text:
|
||||
continue
|
||||
start = snippet.get("start", 0) if isinstance(snippet, dict) else 0
|
||||
duration = snippet.get("duration", 0) if isinstance(snippet, dict) else 0
|
||||
segments.append(TranscriptSegment(
|
||||
start=float(start),
|
||||
end=float(start) + float(duration),
|
||||
text=text,
|
||||
))
|
||||
|
||||
if not segments:
|
||||
logger.warning(f"YouTube 字幕内容为空: {video_id}")
|
||||
return None
|
||||
|
||||
full_text = " ".join(seg.text for seg in segments)
|
||||
logger.info(f"成功获取 YouTube 字幕,共 {len(segments)} 段")
|
||||
|
||||
return TranscriptResult(
|
||||
language=transcript.language_code,
|
||||
full_text=full_text,
|
||||
segments=segments,
|
||||
raw={
|
||||
"source": "youtube_transcript_api",
|
||||
"language": transcript.language,
|
||||
"language_code": transcript.language_code,
|
||||
"is_generated": transcript.is_generated,
|
||||
},
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"YouTube 字幕获取失败: {e}")
|
||||
return None
|
||||
@@ -133,8 +133,46 @@ class NoteGenerator:
|
||||
audio_cache_file = NOTE_OUTPUT_DIR / f"{task_id}_audio.json"
|
||||
transcript_cache_file = NOTE_OUTPUT_DIR / f"{task_id}_transcript.json"
|
||||
markdown_cache_file = NOTE_OUTPUT_DIR / f"{task_id}_markdown.md"
|
||||
print(audio_cache_file)
|
||||
# 1. 下载音频/视频
|
||||
# 1. 获取字幕/转写:优先缓存 → 平台字幕 → 音频转写
|
||||
transcript = None
|
||||
|
||||
# 尝试读取缓存
|
||||
if transcript_cache_file.exists():
|
||||
logger.info(f"检测到转写缓存 ({transcript_cache_file}),尝试读取")
|
||||
try:
|
||||
data = json.loads(transcript_cache_file.read_text(encoding="utf-8"))
|
||||
segments = [TranscriptSegment(**seg) for seg in data.get("segments", [])]
|
||||
transcript = TranscriptResult(
|
||||
language=data.get("language"),
|
||||
full_text=data["full_text"],
|
||||
segments=segments,
|
||||
)
|
||||
logger.info(f"已从缓存加载转写结果,共 {len(segments)} 段")
|
||||
except Exception as e:
|
||||
logger.warning(f"加载转写缓存失败: {e}")
|
||||
|
||||
# 缓存没有,尝试获取平台字幕
|
||||
if transcript is None:
|
||||
logger.info("尝试获取平台字幕(优先于音频下载)...")
|
||||
try:
|
||||
transcript = downloader.download_subtitles(video_url)
|
||||
if transcript and transcript.segments:
|
||||
logger.info(f"成功获取平台字幕,共 {len(transcript.segments)} 段")
|
||||
transcript_cache_file.write_text(
|
||||
json.dumps(asdict(transcript), ensure_ascii=False, indent=2),
|
||||
encoding="utf-8",
|
||||
)
|
||||
else:
|
||||
transcript = None
|
||||
logger.info("平台无可用字幕,将下载音频后转写")
|
||||
except Exception as e:
|
||||
logger.warning(f"获取平台字幕失败: {e},将下载音频后转写")
|
||||
transcript = None
|
||||
|
||||
# 2. 下载音频/视频
|
||||
# 有字幕时只提取元信息,不下载音视频文件(除非需要截图/视频理解)
|
||||
has_transcript = transcript is not None
|
||||
need_full_download = not has_transcript or screenshot or video_understanding
|
||||
audio_meta = self._download_media(
|
||||
downloader=downloader,
|
||||
video_url=video_url,
|
||||
@@ -147,18 +185,19 @@ class NoteGenerator:
|
||||
video_understanding=video_understanding,
|
||||
video_interval=video_interval,
|
||||
grid_size=grid_size,
|
||||
skip_download=not need_full_download,
|
||||
)
|
||||
|
||||
# 2. 获取字幕/转写文字
|
||||
# 优先尝试获取平台字幕,没有再 fallback 到音频转写
|
||||
transcript = self._get_transcript(
|
||||
downloader=downloader,
|
||||
video_url=video_url,
|
||||
audio_file=audio_meta.file_path,
|
||||
transcript_cache_file=transcript_cache_file,
|
||||
status_phase=TaskStatus.TRANSCRIBING,
|
||||
task_id=task_id,
|
||||
)
|
||||
# 3. 如果前面没拿到字幕,走转写流程
|
||||
if transcript is None:
|
||||
transcript = self._get_transcript(
|
||||
downloader=downloader,
|
||||
video_url=video_url,
|
||||
audio_file=audio_meta.file_path,
|
||||
transcript_cache_file=transcript_cache_file,
|
||||
status_phase=TaskStatus.TRANSCRIBING,
|
||||
task_id=task_id,
|
||||
)
|
||||
|
||||
# 3. GPT 总结
|
||||
markdown = self._summarize_text(
|
||||
@@ -331,6 +370,7 @@ class NoteGenerator:
|
||||
video_understanding: bool,
|
||||
video_interval: int,
|
||||
grid_size: List[int],
|
||||
skip_download: bool = False,
|
||||
) -> AudioDownloadResult | None:
|
||||
"""
|
||||
1. 检查音频缓存;若不存在,则根据需要下载音频或视频(若需截图/可视化)。
|
||||
@@ -353,7 +393,34 @@ class NoteGenerator:
|
||||
task_id = audio_cache_file.stem.split("_")[0]
|
||||
self._update_status(task_id, status_phase)
|
||||
|
||||
# 已有缓存,尝试加载
|
||||
if audio_cache_file.exists():
|
||||
logger.info(f"检测到音频缓存 ({audio_cache_file}),直接读取")
|
||||
try:
|
||||
data = json.loads(audio_cache_file.read_text(encoding="utf-8"))
|
||||
return AudioDownloadResult(**data)
|
||||
except Exception as e:
|
||||
logger.warning(f"读取音频缓存失败,将重新下载:{e}")
|
||||
|
||||
# 有字幕且不需要截图/视频理解时,只提取元信息不下载文件
|
||||
if skip_download:
|
||||
logger.info("已有字幕,仅提取视频元信息(不下载音视频)")
|
||||
try:
|
||||
audio = downloader.download(
|
||||
video_url=video_url,
|
||||
quality=quality,
|
||||
output_dir=output_path,
|
||||
need_video=False,
|
||||
skip_download=True,
|
||||
)
|
||||
audio_cache_file.write_text(
|
||||
json.dumps(asdict(audio), ensure_ascii=False, indent=2),
|
||||
encoding="utf-8",
|
||||
)
|
||||
logger.info(f"元信息提取完成 ({audio_cache_file})")
|
||||
return audio
|
||||
except Exception as exc:
|
||||
logger.warning(f"元信息提取失败,将尝试完整下载: {exc}")
|
||||
|
||||
# 判断是否需要下载视频
|
||||
need_video = screenshot or video_understanding
|
||||
@@ -368,9 +435,8 @@ class NoteGenerator:
|
||||
self.video_path = Path(video_path_str)
|
||||
logger.info(f"视频下载完成:{self.video_path}")
|
||||
|
||||
# 若指定了 grid_size,则生成缩略图
|
||||
if grid_size:
|
||||
self.video_img_urls=VideoReader(
|
||||
self.video_img_urls = VideoReader(
|
||||
video_path=str(self.video_path),
|
||||
grid_size=tuple(grid_size),
|
||||
frame_interval=frame_interval,
|
||||
@@ -382,17 +448,9 @@ class NoteGenerator:
|
||||
logger.info("未指定 grid_size,跳过缩略图生成")
|
||||
except Exception as exc:
|
||||
logger.error(f"视频下载失败:{exc}")
|
||||
|
||||
self._handle_exception(task_id, exc)
|
||||
raise
|
||||
# 已有缓存,尝试加载
|
||||
if audio_cache_file.exists():
|
||||
logger.info(f"检测到音频缓存 ({audio_cache_file}),直接读取")
|
||||
try:
|
||||
data = json.loads(audio_cache_file.read_text(encoding="utf-8"))
|
||||
return AudioDownloadResult(**data)
|
||||
except Exception as e:
|
||||
logger.warning(f"读取音频缓存失败,将重新下载:{e}")
|
||||
|
||||
# 下载音频
|
||||
try:
|
||||
logger.info("开始下载音频")
|
||||
@@ -402,7 +460,6 @@ class NoteGenerator:
|
||||
output_dir=output_path,
|
||||
need_video=need_video,
|
||||
)
|
||||
# 缓存 audio 元信息到本地 JSON
|
||||
audio_cache_file.write_text(json.dumps(asdict(audio), ensure_ascii=False, indent=2), encoding="utf-8")
|
||||
logger.info(f"音频下载并缓存成功 ({audio_cache_file})")
|
||||
return audio
|
||||
|
||||
@@ -124,5 +124,6 @@ weasyprint==65.1
|
||||
webencodings==0.5.1
|
||||
websockets==15.0.1
|
||||
yarl==1.19.0
|
||||
youtube-transcript-api>=1.0.0
|
||||
yt-dlp==2025.3.31
|
||||
zopfli==0.2.3.post1
|
||||
|
||||
Reference in New Issue
Block a user