mirror of
https://github.com/JefferyHcool/BiliNote.git
synced 2026-05-07 06:22:45 +08:00
43 lines
1.3 KiB
Python
43 lines
1.3 KiB
Python
import importlib.util
|
|
import pathlib
|
|
import threading
|
|
import time
|
|
import unittest
|
|
|
|
|
|
ROOT = pathlib.Path(__file__).resolve().parents[1]
|
|
MODULE_PATH = ROOT / "app" / "services" / "task_serial_executor.py"
|
|
spec = importlib.util.spec_from_file_location("task_serial_executor", MODULE_PATH)
|
|
if spec is None or spec.loader is None:
|
|
raise ImportError("task_serial_executor module spec not found")
|
|
task_serial_executor = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(task_serial_executor)
|
|
SerialTaskExecutor = task_serial_executor.SerialTaskExecutor
|
|
|
|
|
|
class TestTaskSerialExecutor(unittest.TestCase):
|
|
def test_executor_runs_tasks_one_by_one(self):
|
|
executor = SerialTaskExecutor()
|
|
state_lock = threading.Lock()
|
|
state = {"active": 0, "peak_active": 0}
|
|
|
|
def critical_work():
|
|
with state_lock:
|
|
state["active"] += 1
|
|
state["peak_active"] = max(state["peak_active"], state["active"])
|
|
time.sleep(0.05)
|
|
with state_lock:
|
|
state["active"] -= 1
|
|
|
|
threads = [threading.Thread(target=lambda: executor.run(critical_work)) for _ in range(2)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
self.assertEqual(state["peak_active"], 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|