diff --git a/DouYinSparkFlow/core/tasks.py b/DouYinSparkFlow/core/tasks.py index a6288e0..379c550 100644 --- a/DouYinSparkFlow/core/tasks.py +++ b/DouYinSparkFlow/core/tasks.py @@ -794,10 +794,44 @@ def task_run_lock(): lock_path = Path("logs/task.run.lock") lock_path.parent.mkdir(parents=True, exist_ok=True) - try: - handle = lock_path.open("x", encoding="utf-8") - except FileExistsError as exc: - raise RuntimeError("another task run is already in progress") from exc + def _lock_owner_is_alive(pid): + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + return True + return True + + while True: + try: + handle = lock_path.open("x", encoding="utf-8") + break + except FileExistsError as exc: + raw_pid = lock_path.read_text(encoding="utf-8", errors="ignore").strip() + stale_pid = None + try: + stale_pid = int(raw_pid) + except (TypeError, ValueError): + stale_pid = None + + if stale_pid is not None and not _lock_owner_is_alive(stale_pid): + logger.warning("Removing stale task lock owned by missing pid=%s", stale_pid) + try: + lock_path.unlink() + except FileNotFoundError: + pass + continue + + if stale_pid is None: + logger.warning("Removing unreadable stale task lock with contents=%r", raw_pid) + try: + lock_path.unlink() + except FileNotFoundError: + pass + continue + + raise RuntimeError("another task run is already in progress") from exc try: handle.write(f"{os.getpid()}\n")