import importlib.util import pathlib import re import sys import tempfile import types import unittest from unittest.mock import patch def _install_stubs(): app_mod = types.ModuleType("app") utils_pkg = types.ModuleType("app.utils") logger_mod = types.ModuleType("app.utils.logger") class _Logger: @staticmethod def info(*_args, **_kwargs): return None @staticmethod def warning(*_args, **_kwargs): return None @staticmethod def error(*_args, **_kwargs): return None def _get_logger(_name): return _Logger() logger_mod.get_logger = _get_logger path_helper_mod = types.ModuleType("app.utils.path_helper") ffmpeg_mod = types.ModuleType("ffmpeg") pil_mod = types.ModuleType("PIL") pil_image_mod = types.ModuleType("PIL.Image") pil_draw_mod = types.ModuleType("PIL.ImageDraw") pil_font_mod = types.ModuleType("PIL.ImageFont") class _FakeImage: pass class _FakeImageDraw: @staticmethod def Draw(*_args, **_kwargs): return None class _FakeImageFont: @staticmethod def truetype(*_args, **_kwargs): return None @staticmethod def load_default(): return None pil_image_mod.Image = _FakeImage pil_draw_mod.ImageDraw = _FakeImageDraw pil_font_mod.ImageFont = _FakeImageFont def _get_app_dir(name): return name path_helper_mod.get_app_dir = _get_app_dir ffmpeg_mod.probe = lambda *_args, **_kwargs: {"format": {"duration": "0"}} sys.modules.setdefault("app", app_mod) sys.modules.setdefault("app.utils", utils_pkg) sys.modules["PIL"] = pil_mod sys.modules["PIL.Image"] = pil_image_mod sys.modules["PIL.ImageDraw"] = pil_draw_mod sys.modules["PIL.ImageFont"] = pil_font_mod sys.modules["ffmpeg"] = ffmpeg_mod sys.modules["app.utils.logger"] = logger_mod sys.modules["app.utils.path_helper"] = path_helper_mod def _load_video_reader_module(): _install_stubs() root = pathlib.Path(__file__).resolve().parents[1] module_path = root / "app" / "utils" / "video_reader.py" spec = importlib.util.spec_from_file_location("video_reader", module_path) if spec is None or spec.loader is None: raise ImportError("video_reader module spec not found") module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) return module video_reader_module = _load_video_reader_module() VideoReader = video_reader_module.VideoReader def _make_fake_ffmpeg_runner(colors_by_second): def _runner(cmd, check=True): output_path = next((arg for arg in cmd if isinstance(arg, str) and arg.endswith(".jpg")), None) if output_path is None: raise AssertionError("Output path not found in ffmpeg cmd") match = re.search(r"frame_(\d{2})_(\d{2})\.jpg$", output_path) if match is None: raise AssertionError("Unexpected output path") sec = int(match.group(1)) * 60 + int(match.group(2)) payload = colors_by_second[sec] with open(output_path, "wb") as f: f.write(payload) return 0 return _runner class TestVideoReaderDeduplicateFrames(unittest.TestCase): def test_extract_frames_skips_adjacent_duplicates_when_enabled(self): with tempfile.TemporaryDirectory() as tmp_dir: frame_dir = pathlib.Path(tmp_dir) / "frames" grid_dir = pathlib.Path(tmp_dir) / "grids" reader = VideoReader( video_path="dummy.mp4", frame_interval=1, frame_dir=str(frame_dir), grid_dir=str(grid_dir), ) fake_colors = { 0: b"frame-a", 1: b"frame-a", 2: b"frame-b", 3: b"frame-b", } with patch.object(video_reader_module.ffmpeg, "probe", return_value={"format": {"duration": "4"}}), \ patch.object(video_reader_module.subprocess, "run", side_effect=_make_fake_ffmpeg_runner(fake_colors)): paths = reader.extract_frames(max_frames=10) names = [pathlib.Path(p).name for p in paths] self.assertEqual(names, ["frame_00_00.jpg", "frame_00_02.jpg"]) if __name__ == "__main__": unittest.main()