feat(app): 添加日志记录功能

- 新增 logger 模块,用于全局日志记录
- 在关键位置添加日志输出,包括事件处理、文件清理、FFmpeg 检查等
- 优化数据库操作,增加日志记录
- 在主程序启动时添加日志输出
This commit is contained in:
Jefferyhcool
2025-04-14 19:50:44 +08:00
parent 43b88c85fa
commit 1ebf236f4f
9 changed files with 165 additions and 66 deletions

View File

@@ -1,8 +1,13 @@
from .sqlite_client import get_connection
from app.utils.logger import get_logger
logger = get_logger(__name__)
def init_video_task_table():
conn = get_connection()
if conn is None:
logger.error("Failed to connect to the database.")
return
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS video_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -12,41 +17,62 @@ def init_video_task_table():
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
try:
conn.commit()
conn.close()
logger.info("video_tasks table created successfully.")
except Exception as e:
logger.error(f"Failed to create video_tasks table: {e}")
def insert_video_task(video_id: str, platform: str, task_id: str):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO video_tasks (video_id, platform, task_id)
VALUES (?, ?, ?)
""", (video_id, platform, task_id))
conn.commit()
conn.close()
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO video_tasks (video_id, platform, task_id)
VALUES (?, ?, ?)
""", (video_id, platform, task_id))
conn.commit()
conn.close()
logger.info(f"Video task inserted successfully."
f"video_id: {video_id}"
f"platform: {platform}"
f"task_id: {task_id}")
except Exception as e:
logger.error(f"Failed to insert video task: {e}")
def get_task_by_video(video_id: str, platform: str):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT task_id FROM video_tasks
WHERE video_id = ? AND platform = ?
ORDER BY created_at DESC
LIMIT 1
""", (video_id, platform))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT task_id FROM video_tasks
WHERE video_id = ? AND platform = ?
ORDER BY created_at DESC
LIMIT 1
""", (video_id, platform))
result = cursor.fetchone()
conn.close()
if result is None:
logger.info(f"No task found for video_id: {video_id} and platform: {platform}")
logger.info(f"Task found for video_id: {video_id} and platform: {platform}")
return result[0] if result else None
except Exception as e:
logger.error(f"Failed to get task by video: {e}")
def delete_task_by_video(video_id: str, platform: str):
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
DELETE FROM video_tasks
WHERE video_id = ? AND platform = ?
""", (video_id, platform))
try:
conn = get_connection()
cursor = conn.cursor()
cursor.execute("""
DELETE FROM video_tasks
WHERE video_id = ? AND platform = ?
""", (video_id, platform))
conn.commit()
conn.close()
conn.commit()
conn.close()
logger.info(f"Task deleted for video_id: {video_id} and platform: {platform}")
except Exception as e:
logger.error(f"Failed to delete task by video: {e}")

View File

@@ -32,13 +32,6 @@ class YoutubeDownloader(Downloader, ABC):
ydl_opts = {
'format': 'bestaudio[ext=m4a]/bestaudio/best',
'outtmpl': output_path,
'postprocessors': [
{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '64',
}
],
'noplaylist': True,
'quiet': False,
}

View File

@@ -28,13 +28,15 @@ from app.utils.video_helper import generate_screenshot
# from app.services.whisperer import transcribe_audio
# from app.services.gpt import summarize_text
from dotenv import load_dotenv
from app.utils.logger import get_logger
logger = get_logger(__name__)
load_dotenv()
BACKEND_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:8000")
output_dir = os.getenv('OUT_DIR')
image_base_url = os.getenv('IMAGE_BASE_URL')
print(output_dir)
logger.info("starting up")
class NoteGenerator:
@@ -47,25 +49,35 @@ class NoteGenerator:
self.provider = os.getenv('MODEl_PROVIDER','openai')
self.video_path = None
logger.info("初始化NoteGenerator")
def get_gpt(self) -> GPT:
if self.provider == 'openai':
logger.info("使用OpenAI")
return OpenaiGPT()
elif self.provider == 'deepSeek':
logger.info("使用DeepSeek")
return DeepSeekGPT()
elif self.provider == 'qwen':
logger.info("使用Qwen")
return QwenGPT()
else:
logger.warning("不支持的AI提供商")
raise ValueError(f"不支持的AI提供商{self.provider}")
def get_downloader(self, platform: str) -> Downloader:
if platform == "bilibili":
logger.info("下载 Bilibili 平台视频")
return BilibiliDownloader()
elif platform == "youtube":
logger.info("下载 YouTube 平台视频")
return YoutubeDownloader()
elif platform == 'douyin':
logger.info("下载 Douyin 平台视频")
return DouyinDownloader()
else:
logger.warning("不支持的平台")
raise ValueError(f"不支持的平台:{platform}")
def get_transcriber(self) -> Transcriber:
@@ -75,11 +87,14 @@ class NoteGenerator:
:return:
'''
if self.transcriber_type == 'fast-whisper':
logger.info("使用Whisper")
return get_transcriber()
else:
logger.warning("不支持的转义器")
raise ValueError(f"不支持的转义器:{self.transcriber}")
def save_meta(self, video_id, platform, task_id):
logger.info(f"记录已经生成的数据信息")
insert_video_task(video_id=video_id, platform=platform, task_id=task_id)
def insert_screenshots_into_markdown(self, markdown: str, video_path: str, image_base_url: str,
@@ -91,18 +106,23 @@ class NoteGenerator:
"""
matches = self.extract_screenshot_timestamps(markdown)
new_markdown = markdown
logger.info(f"开始为笔记生成截图")
try:
for idx, (marker, ts) in enumerate(matches):
image_path = generate_screenshot(video_path, output_dir, ts, idx)
image_relative_path = os.path.join(image_base_url, os.path.basename(image_path)).replace("\\", "/")
image_url = f"{BACKEND_BASE_URL.rstrip('/')}/{image_relative_path.lstrip('/')}"
replacement = f"![]({image_url})"
new_markdown = new_markdown.replace(marker, replacement, 1)
for idx, (marker, ts) in enumerate(matches):
image_path = generate_screenshot(video_path, output_dir, ts, idx)
image_relative_path = os.path.join(image_base_url, os.path.basename(image_path)).replace("\\", "/")
image_url = f"{BACKEND_BASE_URL.rstrip('/')}/{image_relative_path.lstrip('/')}"
replacement = f"![]({image_url})"
new_markdown = new_markdown.replace(marker, replacement, 1)
return new_markdown
return new_markdown
except Exception as e:
logger.error(f"截图生成失败:{e}")
raise e
@staticmethod
def delete_note(video_id: str, platform: str):
logger.info(f"删除生成的笔记记录")
return delete_task_by_video(video_id, platform)
import re
@@ -112,6 +132,7 @@ class NoteGenerator:
从 Markdown 中提取 Screenshot 时间标记(如 *Screenshot-03:39 或 Screenshot-[03:39]
并返回匹配文本和对应时间戳(秒)
"""
logger.info(f"开始提取截图时间标记")
pattern = r"(?:\*Screenshot-(\d{2}):(\d{2})|Screenshot-\[(\d{2}):(\d{2})\])"
matches = list(re.finditer(pattern, markdown))
results = []
@@ -134,12 +155,15 @@ class NoteGenerator:
path: Union[str, None] = None
) -> NoteResult:
logger.info(f"开始解析并生成笔记")
# 1. 选择下载器
downloader = self.get_downloader(platform)
gpt = self.get_gpt()
logger.info(f'使用{downloader.__class__.__name__}下载器\n'
f'使用{gpt.__class__.__name__}GPT\n'
f'视频地址:{video_url}')
if screenshot:
video_path = downloader.download_video(video_url)
self.video_path = video_path
print(video_path)
@@ -152,10 +176,10 @@ class NoteGenerator:
need_video=screenshot
)
logger.info(f"下载音频成功,文件路径:{audio.file_path}")
# 3. Whisper 转写
transcript: TranscriptResult = self.transcriber.transcript(file_path=audio.file_path)
logger.info(f"Whisper 转写成功,转写结果:{transcript.full_text}")
# 4. GPT 总结
source = GPTSource(
title=audio.title,
@@ -164,6 +188,7 @@ class NoteGenerator:
screenshot=screenshot,
link=link
)
logger.info(f"GPT 总结完成,总结结果:{source}")
markdown: str = gpt.summarize(source)
print("markdown结果", markdown)
@@ -179,6 +204,3 @@ class NoteGenerator:
)
if __name__ == '__main__':
note = NoteGenerator()
print(note.audio_meta)

View File

@@ -1,11 +1,19 @@
from app.transcriber.whisper import WhisperTranscriber
print('实例化transcriber')
from app.utils.logger import get_logger
logger = get_logger(__name__)
logger.info('实例化transcriber')
# TODO:后面需要加入逻辑选择
_transcriber = None
def get_transcriber(model_size="base", device="cuda"):
global _transcriber
if _transcriber is None:
print('加载_transcriber')
_transcriber = WhisperTranscriber(model_size=model_size, device=device)
logger.info('不存在 transcriber 开始实例化transcriber。')
try:
_transcriber = WhisperTranscriber(model_size=model_size, device=device)
logger.info(f'实例化transcriber成功。参数{model_size}, {device} ')
except Exception as e:
logger.error(f"实例化transcriber失败请检查是否安装whisper。{e}")
return _transcriber

View File

@@ -0,0 +1,32 @@
import logging
import sys
from pathlib import Path
# 日志目录
LOG_DIR = Path("logs")
LOG_DIR.mkdir(exist_ok=True)
# 日志格式
formatter = logging.Formatter(
fmt="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
# 控制台输出
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# 文件输出
file_handler = logging.FileHandler(LOG_DIR / "app.log", encoding="utf-8")
file_handler.setFormatter(formatter)
# 获取日志器
def get_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
if not logger.handlers:
logger.setLevel(logging.INFO)
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.propagate = False
return logger

View File

@@ -1,8 +1,14 @@
# 注册监听器
from app.utils.logger import get_logger
from events.handlers import cleanup_temp_files
from events.signals import transcription_finished
logger = get_logger(__name__)
def register_handler():
transcription_finished.connect(cleanup_temp_files)
try:
transcription_finished.connect(cleanup_temp_files)
logger.info("注册监听器成功")
except Exception as e:
logger.error(f"注册监听器失败:{e}")

View File

@@ -1,8 +1,14 @@
import os
from app.utils.logger import get_logger
logger = get_logger(__name__)
def cleanup_temp_files(data):
print(f"🧹 清理转写文件{data['file_path']}")
logger.info(f"starting cleanup temp files {data['file_path']}")
os.remove(data['file_path'])
# 检查是否删除文件
if os.path.exists(data['file_path']):
logger.info(f"cleanup temp files failed {data['file_path']}")
else:
logger.info(f"cleanup temp files success {data['file_path']}")

View File

@@ -1,22 +1,26 @@
import os
import subprocess
from dotenv import load_dotenv
load_dotenv()
from app.utils.logger import get_logger
logger = get_logger(__name__)
load_dotenv()
def check_ffmpeg_exists() -> bool:
"""
检查 ffmpeg 是否可用。优先使用 FFMPEG_BIN_PATH 环境变量指定的路径。
"""
ffmpeg_bin_path = os.getenv("FFMPEG_BIN_PATH")
print(f"FFMPEG_BIN_PATH: {ffmpeg_bin_path}")
logger.info(f"FFMPEG_BIN_PATH: {ffmpeg_bin_path}")
if ffmpeg_bin_path and os.path.isdir(ffmpeg_bin_path):
os.environ["PATH"] = ffmpeg_bin_path + os.pathsep + os.environ.get("PATH", "")
logger.info(f"ffmpeg 未配置路径尝试使用系统路径PATH: {os.environ.get('PATH')}")
try:
subprocess.run(["ffmpeg", "-version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
logger.info("ffmpeg 已安装")
return True
except (FileNotFoundError, OSError, subprocess.CalledProcessError):
logger.info("ffmpeg 未安装")
return False
@@ -25,6 +29,7 @@ def ensure_ffmpeg_or_raise():
校验 ffmpeg 是否可用,否则抛出异常并提示安装方式。
"""
if not check_ffmpeg_exists():
logger.error("未检测到 ffmpeg请先安装后再使用本功能。")
raise EnvironmentError(
"❌ 未检测到 ffmpeg请先安装后再使用本功能。\n"
"👉 下载地址https://ffmpeg.org/download.html\n"

View File

@@ -3,13 +3,14 @@ import os
import uvicorn
from starlette.staticfiles import StaticFiles
from dotenv import load_dotenv
from app.utils.logger import get_logger
from app import create_app
from app.db.video_task_dao import init_video_task_table
from app.transcriber.transcriber_provider import get_transcriber
from events import register_handler
from ffmpeg_helper import ensure_ffmpeg_or_raise
logger = get_logger(__name__)
load_dotenv()
# 读取 .env 中的路径
@@ -38,6 +39,6 @@ async def startup_event():
if __name__ == "__main__":
port = int(os.getenv("BACKEND_PORT", 8000))
host = os.getenv("BACKEND_HOST", "0.0.0.0")
logger.info(f"Starting server on {host}:{port}")
uvicorn.run("main:app", host=host, port=port, reload=True)