feat(events): 实现转写完成后的文件清理功能

- 新增 events 模块,包括 handlers 和 signals 子模块
- 在 handlers 中实现 cleanup_temp_files 函数,用于清理转写临时文件
- 在 signals 中定义 transcription_finished 信号
- 修改 main.py,添加 startup_event 函数以注册事件处理器- 更新 WhisperTranscriber 类,增加 on_finish 方法并发送转写完成信号
- 在 base.py 中添加 TranscriberBase 类的 on_finish 方法占位符
This commit is contained in:
Jefferyhcool
2025-04-13 23:29:33 +08:00
parent 595a38723f
commit 43b88c85fa
7 changed files with 62 additions and 26 deletions

0
backend/__init__.py Normal file
View File

View File

@@ -11,4 +11,13 @@ class Transcriber(ABC):
:param file_path:音频路径
:return: 返回一个 TranscriptResult 类
'''
pass
def on_finish(self,video_path:str,result: TranscriptResult)->None:
'''
当音频转录完成时调用
:param video_path: 视频路径
:param result: 识别结果
:return:
'''
pass

View File

@@ -6,6 +6,8 @@ from app.transcriber.base import Transcriber
from app.utils.env_checker import is_cuda_available, is_torch_installed
from app.utils.path_helper import get_model_dir
from events import transcription_finished
'''
Size of the model to use (tiny, tiny.en, base, base.en, small, small.en, distil-small.en, medium, medium.en, distil-medium.en, large-v1, large-v2, large-v3, large, distil-large-v2, distil-large-v3, large-v3-turbo, or turbo
'''
@@ -64,29 +66,37 @@ class WhisperTranscriber(Transcriber):
@timeit
def transcript(self, file_path: str) -> TranscriptResult:
try:
segments_raw, info = self.model.transcribe(file_path)
segments_raw, info = self.model.transcribe(file_path)
segments = []
full_text = ""
segments = []
full_text = ""
for seg in segments_raw:
text = seg.text.strip()
full_text += text + " "
segments.append(TranscriptSegment(
start=seg.start,
end=seg.end,
text=text
))
for seg in segments_raw:
text = seg.text.strip()
full_text += text + " "
segments.append(TranscriptSegment(
start=seg.start,
end=seg.end,
text=text
))
return TranscriptResult(
language=info.language,
full_text=full_text.strip(),
segments=segments,
raw=info
)
result= TranscriptResult(
language=info.language,
full_text=full_text.strip(),
segments=segments,
raw=info
)
self.on_finish(file_path, result)
return result
except Exception as e:
print(f"转写失败:{e}")
if __name__ == '__main__':
print(WhisperTranscriber(cpu_threads=8).transcript(
'''D:\\data_backup_from_ssd\\02_个人项目\\11_BiliNote\\backend\\data\\BV1vcZ5YQE9X.mp3'''))
def on_finish(self,video_path:str,result: TranscriptResult)->None:
print("转写完成")
transcription_finished.send({
"file_path": video_path,
})

View File

@@ -0,0 +1,8 @@
# 注册监听器
from events.handlers import cleanup_temp_files
from events.signals import transcription_finished
def register_handler():
transcription_finished.connect(cleanup_temp_files)

View File

@@ -0,0 +1,8 @@
import os
def cleanup_temp_files(data):
print(f"🧹 清理转写文件:{data['file_path']}")
os.remove(data['file_path'])

View File

@@ -0,0 +1,2 @@
from blinker import signal
transcription_finished = signal("transcription_finished")

View File

@@ -7,6 +7,7 @@ from dotenv import load_dotenv
from app import create_app
from app.db.video_task_dao import init_video_task_table
from app.transcriber.transcriber_provider import get_transcriber
from events import register_handler
from ffmpeg_helper import ensure_ffmpeg_or_raise
load_dotenv()
@@ -26,15 +27,13 @@ if not os.path.exists(out_dir):
app = create_app()
app.mount(static_path, StaticFiles(directory=static_dir), name="static")
async def startup_event():
register_handler()
@app.on_event("startup")
def check_env():
async def startup_event():
register_handler()
ensure_ffmpeg_or_raise()
@app.on_event("startup")
async def load_model_on_startup():
get_transcriber()
@app.on_event("startup")
def startup():
init_video_task_table()
if __name__ == "__main__":