Files
BiliNote/backend/tests/test_task_serial_executor.py
CyanAutumn d9a7b89e7d 🐞 fix: 增加错误之后对已解析段落的缓存功能,再次重试时不再重头开始
解析长视频时,当附件大小过大时不再调用后进行报错,而是将附件进行分批次发送

在每篇笔记开头默认增加地址来源链接,对模糊处可溯源
2026-02-12 18:28:11 +08:00

43 lines
1.3 KiB
Python

import importlib.util
import pathlib
import threading
import time
import unittest
ROOT = pathlib.Path(__file__).resolve().parents[1]
MODULE_PATH = ROOT / "app" / "services" / "task_serial_executor.py"
spec = importlib.util.spec_from_file_location("task_serial_executor", MODULE_PATH)
if spec is None or spec.loader is None:
raise ImportError("task_serial_executor module spec not found")
task_serial_executor = importlib.util.module_from_spec(spec)
spec.loader.exec_module(task_serial_executor)
SerialTaskExecutor = task_serial_executor.SerialTaskExecutor
class TestTaskSerialExecutor(unittest.TestCase):
def test_executor_runs_tasks_one_by_one(self):
executor = SerialTaskExecutor()
state_lock = threading.Lock()
state = {"active": 0, "peak_active": 0}
def critical_work():
with state_lock:
state["active"] += 1
state["peak_active"] = max(state["peak_active"], state["active"])
time.sleep(0.05)
with state_lock:
state["active"] -= 1
threads = [threading.Thread(target=lambda: executor.run(critical_work)) for _ in range(2)]
for t in threads:
t.start()
for t in threads:
t.join()
self.assertEqual(state["peak_active"], 1)
if __name__ == "__main__":
unittest.main()