Merge pull request #268 from nbzcy/feature/subtitle-priority-and-export-enhancements

feat: Add subtitle priority fetching and enhance mindmap export
This commit is contained in:
Jianwu Huang
2026-02-05 16:04:38 +08:00
committed by GitHub
6 changed files with 619 additions and 11 deletions

View File

@@ -5,6 +5,7 @@ from typing import Optional, Union
from app.enmus.note_enums import DownloadQuality
from app.models.notes_model import AudioDownloadResult
from app.models.transcriber_model import TranscriptResult
from os import getenv
QUALITY_MAP = {
"fast": "32",
@@ -36,3 +37,15 @@ class Downloader(ABC):
def download_video(self, video_url: str,
output_dir: Union[str, None] = None) -> str:
pass
def download_subtitles(self, video_url: str, output_dir: str = None,
langs: list = None) -> Optional[TranscriptResult]:
'''
尝试获取平台字幕(人工字幕或自动生成字幕)
:param video_url: 视频链接
:param output_dir: 输出路径
:param langs: 优先语言列表,如 ['zh-Hans', 'zh', 'en']
:return: TranscriptResult 或 None无字幕时
'''
return None

View File

@@ -1,14 +1,23 @@
import os
import json
import logging
from abc import ABC
from typing import Union, Optional
from typing import Union, Optional, List
from pathlib import Path
import yt_dlp
from app.downloaders.base import Downloader, DownloadQuality, QUALITY_MAP
from app.models.notes_model import AudioDownloadResult
from app.models.transcriber_model import TranscriptResult, TranscriptSegment
from app.utils.path_helper import get_data_dir
from app.utils.url_parser import extract_video_id
logger = logging.getLogger(__name__)
# B站 cookies 文件路径
BILIBILI_COOKIES_FILE = os.getenv("BILIBILI_COOKIES_FILE", "cookies.txt")
class BilibiliDownloader(Downloader, ABC):
def __init__(self):
@@ -111,4 +120,198 @@ class BilibiliDownloader(Downloader, ABC):
os.remove(video_path)
return f"视频文件已删除: {video_path}"
else:
return f"视频文件未找到: {video_path}"
return f"视频文件未找到: {video_path}"
def download_subtitles(self, video_url: str, output_dir: str = None,
langs: List[str] = None) -> Optional[TranscriptResult]:
"""
尝试获取B站视频字幕
:param video_url: 视频链接
:param output_dir: 输出路径
:param langs: 优先语言列表
:return: TranscriptResult 或 None
"""
if output_dir is None:
output_dir = get_data_dir()
if not output_dir:
output_dir = self.cache_data
os.makedirs(output_dir, exist_ok=True)
if langs is None:
langs = ['zh-Hans', 'zh', 'zh-CN', 'ai-zh', 'en', 'en-US']
video_id = extract_video_id(video_url, "bilibili")
ydl_opts = {
'writesubtitles': True,
'writeautomaticsub': True,
'subtitleslangs': langs,
'subtitlesformat': 'srt/json3/best', # 支持多种格式
'skip_download': True,
'outtmpl': os.path.join(output_dir, f'{video_id}.%(ext)s'),
'quiet': True,
}
# 添加 cookies 支持
cookies_path = Path(BILIBILI_COOKIES_FILE)
if not cookies_path.is_absolute():
# 相对于 backend 目录
cookies_path = Path(__file__).parent.parent.parent / BILIBILI_COOKIES_FILE
if cookies_path.exists():
ydl_opts['cookiefile'] = str(cookies_path)
logger.info(f"使用 cookies 文件: {cookies_path}")
else:
logger.warning(f"B站 cookies 文件不存在: {cookies_path},字幕获取可能失败")
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
# 查找下载的字幕文件
subtitles = info.get('requested_subtitles') or {}
if not subtitles:
logger.info(f"B站视频 {video_id} 没有可用字幕")
return None
# 按优先级查找字幕
detected_lang = None
sub_info = None
for lang in langs:
if lang in subtitles:
detected_lang = lang
sub_info = subtitles[lang]
break
# 如果按优先级没找到,取第一个可用的(排除弹幕)
if not detected_lang:
for lang, info_item in subtitles.items():
if lang != 'danmaku': # 排除弹幕
detected_lang = lang
sub_info = info_item
break
if not sub_info:
logger.info(f"B站视频 {video_id} 没有可用字幕(排除弹幕)")
return None
# 检查是否有内嵌数据yt-dlp 有时直接返回字幕内容)
if 'data' in sub_info and sub_info['data']:
logger.info(f"直接从返回数据解析字幕: {detected_lang}")
return self._parse_srt_content(sub_info['data'], detected_lang)
# 查找字幕文件
ext = sub_info.get('ext', 'srt')
subtitle_file = os.path.join(output_dir, f"{video_id}.{detected_lang}.{ext}")
if not os.path.exists(subtitle_file):
logger.info(f"字幕文件不存在: {subtitle_file}")
return None
# 根据格式解析字幕文件
if ext == 'json3':
return self._parse_json3_subtitle(subtitle_file, detected_lang)
else:
with open(subtitle_file, 'r', encoding='utf-8') as f:
return self._parse_srt_content(f.read(), detected_lang)
except Exception as e:
logger.warning(f"获取B站字幕失败: {e}")
return None
def _parse_srt_content(self, srt_content: str, language: str) -> Optional[TranscriptResult]:
"""
解析 SRT 格式字幕内容
:param srt_content: SRT 字幕文本内容
:param language: 语言代码
:return: TranscriptResult
"""
import re
try:
segments = []
# SRT 格式: 序号\n时间戳\n文本\n\n
pattern = r'(\d+)\n(\d{2}:\d{2}:\d{2},\d{3})\s*-->\s*(\d{2}:\d{2}:\d{2},\d{3})\n(.*?)(?=\n\n|\n\d+\n|$)'
matches = re.findall(pattern, srt_content, re.DOTALL)
for match in matches:
idx, start_time, end_time, text = match
text = text.strip()
if not text:
continue
# 转换时间格式 00:00:00,000 -> 秒
def time_to_seconds(t):
parts = t.replace(',', '.').split(':')
return float(parts[0]) * 3600 + float(parts[1]) * 60 + float(parts[2])
segments.append(TranscriptSegment(
start=time_to_seconds(start_time),
end=time_to_seconds(end_time),
text=text
))
if not segments:
return None
full_text = ' '.join(seg.text for seg in segments)
logger.info(f"成功解析B站SRT字幕{len(segments)}")
return TranscriptResult(
language=language,
full_text=full_text,
segments=segments,
raw={'source': 'bilibili_subtitle', 'format': 'srt'}
)
except Exception as e:
logger.warning(f"解析SRT字幕失败: {e}")
return None
def _parse_json3_subtitle(self, subtitle_file: str, language: str) -> Optional[TranscriptResult]:
"""
解析 json3 格式字幕文件
:param subtitle_file: 字幕文件路径
:param language: 语言代码
:return: TranscriptResult
"""
try:
with open(subtitle_file, 'r', encoding='utf-8') as f:
data = json.load(f)
segments = []
events = data.get('events', [])
for event in events:
# json3 格式中时间单位是毫秒
start_ms = event.get('tStartMs', 0)
duration_ms = event.get('dDurationMs', 0)
# 提取文本
segs = event.get('segs', [])
text = ''.join(seg.get('utf8', '') for seg in segs).strip()
if text: # 只添加非空文本
segments.append(TranscriptSegment(
start=start_ms / 1000.0,
end=(start_ms + duration_ms) / 1000.0,
text=text
))
if not segments:
return None
full_text = ' '.join(seg.text for seg in segments)
logger.info(f"成功解析B站字幕{len(segments)}")
return TranscriptResult(
language=language,
full_text=full_text,
segments=segments,
raw={'source': 'bilibili_subtitle', 'file': subtitle_file}
)
except Exception as e:
logger.warning(f"解析字幕文件失败: {e}")
return None

View File

@@ -1,14 +1,19 @@
import os
import json
import logging
from abc import ABC
from typing import Union, Optional
from typing import Union, Optional, List
import yt_dlp
from app.downloaders.base import Downloader, DownloadQuality
from app.models.notes_model import AudioDownloadResult
from app.models.transcriber_model import TranscriptResult, TranscriptSegment
from app.utils.path_helper import get_data_dir
from app.utils.url_parser import extract_video_id
logger = logging.getLogger(__name__)
class YoutubeDownloader(Downloader, ABC):
def __init__(self):
@@ -92,3 +97,119 @@ class YoutubeDownloader(Downloader, ABC):
raise FileNotFoundError(f"视频文件未找到: {video_path}")
return video_path
def download_subtitles(self, video_url: str, output_dir: str = None,
langs: List[str] = None) -> Optional[TranscriptResult]:
"""
尝试获取YouTube视频字幕优先人工字幕其次自动生成
:param video_url: 视频链接
:param output_dir: 输出路径
:param langs: 优先语言列表
:return: TranscriptResult 或 None
"""
if output_dir is None:
output_dir = get_data_dir()
if not output_dir:
output_dir = self.cache_data
os.makedirs(output_dir, exist_ok=True)
if langs is None:
langs = ['zh-Hans', 'zh', 'zh-CN', 'zh-TW', 'en', 'en-US']
video_id = extract_video_id(video_url, "youtube")
ydl_opts = {
'writesubtitles': True,
'writeautomaticsub': True,
'subtitleslangs': langs,
'subtitlesformat': 'json3',
'skip_download': True,
'outtmpl': os.path.join(output_dir, f'{video_id}.%(ext)s'),
'quiet': True,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
# 查找下载的字幕文件
subtitles = info.get('requested_subtitles') or {}
if not subtitles:
logger.info(f"YouTube视频 {video_id} 没有可用字幕")
return None
# 按优先级查找字幕文件
subtitle_file = None
detected_lang = None
for lang in langs:
if lang in subtitles:
subtitle_file = os.path.join(output_dir, f"{video_id}.{lang}.json3")
detected_lang = lang
break
# 如果按优先级没找到,取第一个可用的
if not subtitle_file:
for lang, sub_info in subtitles.items():
subtitle_file = os.path.join(output_dir, f"{video_id}.{lang}.json3")
detected_lang = lang
break
if not subtitle_file or not os.path.exists(subtitle_file):
logger.info(f"字幕文件不存在: {subtitle_file}")
return None
# 解析字幕文件
return self._parse_json3_subtitle(subtitle_file, detected_lang)
except Exception as e:
logger.warning(f"获取YouTube字幕失败: {e}")
return None
def _parse_json3_subtitle(self, subtitle_file: str, language: str) -> Optional[TranscriptResult]:
"""
解析 json3 格式字幕文件
:param subtitle_file: 字幕文件路径
:param language: 语言代码
:return: TranscriptResult
"""
try:
with open(subtitle_file, 'r', encoding='utf-8') as f:
data = json.load(f)
segments = []
events = data.get('events', [])
for event in events:
# json3 格式中时间单位是毫秒
start_ms = event.get('tStartMs', 0)
duration_ms = event.get('dDurationMs', 0)
# 提取文本
segs = event.get('segs', [])
text = ''.join(seg.get('utf8', '') for seg in segs).strip()
if text: # 只添加非空文本
segments.append(TranscriptSegment(
start=start_ms / 1000.0,
end=(start_ms + duration_ms) / 1000.0,
text=text
))
if not segments:
return None
full_text = ' '.join(seg.text for seg in segments)
logger.info(f"成功解析YouTube字幕{len(segments)}")
return TranscriptResult(
language=language,
full_text=full_text,
segments=segments,
raw={'source': 'youtube_subtitle', 'file': subtitle_file}
)
except Exception as e:
logger.warning(f"解析字幕文件失败: {e}")
return None

View File

@@ -147,11 +147,15 @@ class NoteGenerator:
grid_size=grid_size,
)
# 2. 转写文字
transcript = self._transcribe_audio(
# 2. 获取字幕/转写文字
# 优先尝试获取平台字幕,没有再 fallback 到音频转写
transcript = self._get_transcript(
downloader=downloader,
video_url=video_url,
audio_file=audio_meta.file_path,
transcript_cache_file=transcript_cache_file,
status_phase=TaskStatus.TRANSCRIBING,
task_id=task_id,
)
# 3. GPT 总结
@@ -400,6 +404,62 @@ class NoteGenerator:
raise
def _get_transcript(
self,
downloader: Downloader,
video_url: str,
audio_file: str,
transcript_cache_file: Path,
status_phase: TaskStatus,
task_id: Optional[str] = None,
) -> TranscriptResult | None:
"""
优先获取平台字幕,没有则 fallback 到音频转写
:param downloader: 下载器实例
:param video_url: 视频链接
:param audio_file: 音频文件路径(用于 fallback 转写)
:param transcript_cache_file: 缓存文件路径
:param status_phase: 状态枚举
:param task_id: 任务 ID
:return: TranscriptResult 对象
"""
self._update_status(task_id, status_phase)
# 已有缓存,直接返回
if transcript_cache_file.exists():
logger.info(f"检测到转写缓存 ({transcript_cache_file}),尝试读取")
try:
data = json.loads(transcript_cache_file.read_text(encoding="utf-8"))
segments = [TranscriptSegment(**seg) for seg in data.get("segments", [])]
return TranscriptResult(language=data.get("language"), full_text=data["full_text"], segments=segments)
except Exception as e:
logger.warning(f"加载转写缓存失败,将重新获取:{e}")
# 1. 先尝试获取平台字幕
logger.info("尝试获取平台字幕...")
try:
transcript = downloader.download_subtitles(video_url)
if transcript and transcript.segments:
logger.info(f"成功获取平台字幕,共 {len(transcript.segments)}")
# 缓存结果
transcript_cache_file.write_text(
json.dumps(asdict(transcript), ensure_ascii=False, indent=2),
encoding="utf-8"
)
return transcript
else:
logger.info("平台无可用字幕,将使用音频转写")
except Exception as e:
logger.warning(f"获取平台字幕失败: {e},将使用音频转写")
# 2. Fallback 到音频转写
return self._transcribe_audio(
audio_file=audio_file,
transcript_cache_file=transcript_cache_file,
status_phase=status_phase,
)
def _transcribe_audio(
self,
audio_file: str,