Files
BiliNote/backend/app/utils/note_helper.py
思诺特 3784e15670 feat(core): 实现 Celery任务异步生成笔记
- 新增 Celery 配置文件 celery_app.py
- 创建 note_tasks.py 文件,定义生成笔记的 Celery 任务
- 修改 note_router,使用 Celery 任务异步处理笔记生成
- 重构 bili_downloader 和 youtube_downloader,支持多质量选择和错误处理
- 更新 .env.example,添加 Celery 配置项
2025-04-15 12:19:14 +08:00

41 lines
1.4 KiB
Python

import json
import os
import re
import re
import re
from dataclasses import asdict
NOTE_OUTPUT_DIR = "note_results"
def replace_content_markers(markdown: str, video_id: str, platform: str = 'bilibili') -> str:
"""
替换 *Content-04:16*、Content-04:16 或 Content-[04:16] 为超链接,跳转到对应平台视频的时间位置
"""
# 匹配三种形式:*Content-04:16*、Content-04:16、Content-[04:16]
pattern = r"(?:\*?)Content-(?:\[(\d{2}):(\d{2})\]|(\d{2}):(\d{2}))"
def replacer(match):
mm = match.group(1) or match.group(3)
ss = match.group(2) or match.group(4)
total_seconds = int(mm) * 60 + int(ss)
if platform == 'bilibili':
url = f"https://www.bilibili.com/video/{video_id}?t={total_seconds}"
elif platform == 'youtube':
url = f"https://www.youtube.com/watch?v={video_id}&t={total_seconds}s"
elif platform == 'douyin':
url = f"https://www.douyin.com/video/{video_id}"
return f"[原片 @ {mm}:{ss}]({url})"
else:
return f"({mm}:{ss})"
return f"[原片 @ {mm}:{ss}]({url})"
return re.sub(pattern, replacer, markdown)
def save_note_to_file(task_id: str, note):
os.makedirs(NOTE_OUTPUT_DIR, exist_ok=True)
with open(os.path.join(NOTE_OUTPUT_DIR, f"{task_id}.json"), "w", encoding="utf-8") as f:
json.dump(asdict(note), f, ensure_ascii=False, indent=2)