refactor: use watchfiles for directory monitor

This commit is contained in:
jxxghp
2026-05-22 09:14:42 +08:00
parent 23487b7ae0
commit 9319b47fad
3 changed files with 352 additions and 114 deletions

View File

@@ -4,13 +4,13 @@ import re
import threading
import time
import traceback
from dataclasses import dataclass
from pathlib import Path
from threading import Lock
from typing import Any, Optional, Dict, List
from apscheduler.schedulers.background import BackgroundScheduler
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent, FileSystemEvent
from watchdog.observers.polling import PollingObserver
from watchfiles import Change, DefaultFilter, watch
from app.chain import ChainBase
from app.chain.storage import StorageChain
@@ -34,29 +34,195 @@ class MonitorChain(ChainBase):
pass
class FileMonitorHandler(FileSystemEventHandler):
@dataclass(frozen=True)
class DirectoryChangeEvent:
"""
目录监控响应类
目录文件变化事件,隔离底层 watchfiles 事件结构。
"""
change_type: Change
src_path: str
is_directory: bool
def __init__(self, mon_path: Path, callback: Any, **kwargs):
super(FileMonitorHandler, self).__init__(**kwargs)
class LocalDirectoryWatcher:
"""
基于 watchfiles 的本地目录监控线程。
"""
_HANDLE_CHANGES = {Change.added, Change.modified}
def __init__(self, mon_path: Path, callback: Any, force_polling: Optional[bool] = None):
"""
初始化本地目录监控。
:param mon_path: 监控目录
:param callback: 目录变化回调对象
:param force_polling: 是否强制使用轮询模式None 表示由 watchfiles 自动选择
"""
self._watch_path = mon_path
self.callback = callback
self._callback = callback
self._force_polling = force_polling
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._watch_filter = DefaultFilter()
def on_created(self, event: FileSystemEvent):
try:
self.callback.event_handler(event=event, text="创建", event_path=event.src_path,
file_size=Path(event.src_path).stat().st_size)
except Exception as e:
logger.error(f"on_created 异常: {e}")
@property
def watch_path(self) -> Path:
"""
获取监控目录。
:return: 监控目录
"""
return self._watch_path
def on_moved(self, event: FileSystemMovedEvent):
def start(self):
"""
启动本地目录监控线程。
"""
if not self._watch_path.exists():
raise FileNotFoundError(f"监控目录不存在: {self._watch_path}")
if not self._watch_path.is_dir():
raise NotADirectoryError(f"监控路径不是目录: {self._watch_path}")
if self.is_alive():
logger.info(f"本地目录监控已在运行中: {self._watch_path}")
return
self._stop_event.clear()
self._thread = threading.Thread(
target=self._run,
name=f"MoviePilot-DirectoryWatcher-{self._watch_path.name}",
daemon=True
)
self._thread.start()
def stop(self):
"""
请求停止本地目录监控线程。
"""
self._stop_event.set()
def join(self, timeout: Optional[float] = None):
"""
等待本地目录监控线程退出。
:param timeout: 最长等待秒数
"""
if self._thread:
self._thread.join(timeout=timeout)
def is_alive(self) -> bool:
"""
判断监控线程是否仍在运行。
:return: 线程存活状态
"""
return bool(self._thread and self._thread.is_alive())
def _run(self):
"""
运行 watchfiles 主循环,并在快速模式不可用时回退到轮询。
"""
try:
self.callback.event_handler(event=event, text="移动", event_path=event.dest_path,
file_size=Path(event.dest_path).stat().st_size)
except Exception as e:
logger.error(f"on_moved 异常: {e}")
self._run_watch(force_polling=self._force_polling)
except Exception as err:
if self._stop_event.is_set():
return
if self._force_polling is True:
logger.error(f"本地目录监控发生错误: {self._watch_path} - {err}")
logger.debug(traceback.format_exc())
return
logger.warn(f"快速模式监控 {self._watch_path} 失败,将自动切换到兼容模式: {err}")
try:
self._run_watch(force_polling=True)
except Exception as fallback_err:
if not self._stop_event.is_set():
logger.error(f"兼容模式监控 {self._watch_path} 仍然失败: {fallback_err}")
logger.debug(traceback.format_exc())
def _run_watch(self, force_polling: Optional[bool]):
"""
执行一次 watchfiles 监控循环。
:param force_polling: 是否强制轮询
"""
for changes in watch(
str(self._watch_path),
watch_filter=self._watch_filter,
stop_event=self._stop_event,
rust_timeout=1000,
yield_on_timeout=True,
force_polling=force_polling,
recursive=True,
ignore_permission_denied=True):
if self._stop_event.is_set():
break
if not changes:
continue
self._handle_changes(changes)
def _handle_changes(self, changes: set[tuple[Change, str]]):
"""
将 watchfiles 原始变更转换为目录监控事件。
:param changes: watchfiles 返回的变更集合
"""
for change_type, path_str in sorted(changes, key=lambda item: item[1]):
if change_type not in self._HANDLE_CHANGES:
continue
event_path = Path(path_str)
event = self._build_event(change_type=change_type, event_path=event_path)
if not event or event.is_directory:
continue
file_size = self._get_file_size(event_path)
if file_size is None:
continue
text = self._change_text(change_type)
try:
self._callback.event_handler(
event=event,
text=text,
event_path=path_str,
file_size=file_size
)
except Exception as err:
logger.error(f"处理本地目录监控事件失败: {path_str} - {err}")
@staticmethod
def _build_event(change_type: Change, event_path: Path) -> Optional[DirectoryChangeEvent]:
"""
构建目录变化事件,路径已不存在时忽略。
:param change_type: watchfiles 变化类型
:param event_path: 变化路径
:return: 目录变化事件
"""
try:
is_directory = event_path.is_dir()
except OSError as err:
logger.debug(f"读取目录监控事件路径失败: {event_path} - {err}")
return None
if not event_path.exists():
return None
return DirectoryChangeEvent(
change_type=change_type,
src_path=event_path.as_posix(),
is_directory=is_directory
)
@staticmethod
def _get_file_size(event_path: Path) -> Optional[int]:
"""
读取事件文件大小,文件已消失时返回 None。
:param event_path: 事件文件路径
:return: 文件大小
"""
try:
return event_path.stat().st_size
except OSError as err:
logger.debug(f"读取目录监控文件大小失败: {event_path} - {err}")
return None
@staticmethod
def _change_text(change_type: Change) -> str:
"""
转换 watchfiles 事件类型为日志文案。
:param change_type: watchfiles 变化类型
:return: 事件描述
"""
if change_type == Change.modified:
return "修改"
return "新增"
class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
@@ -67,10 +233,8 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
def __init__(self):
super().__init__()
# 退出事件
self._event = threading.Event()
# 监控服务
self._observers = []
# 本地目录监控服务
self._watchers = []
# 定时服务
self._scheduler = None
# 存储过照间隔(分钟)
@@ -435,32 +599,25 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
limits=limits)
logger.info(f"监控模式决策: {reason}")
if use_polling:
observer = PollingObserver()
logger.info(f"使用兼容模式(轮询)监控 {mon_path}")
else:
observer = self.__choose_observer()
if observer is None:
logger.warn(f"快速模式不可用,自动切换到兼容模式监控 {mon_path}")
observer = PollingObserver()
else:
logger.info(f"使用快速模式监控 {mon_path}")
if limits['warnings']:
for warning in limits['warnings']:
logger.warn(f"系统限制警告: {warning}")
if limits['max_user_watches'] > 0:
usage_percent = (file_count / limits['max_user_watches']) * 100
logger.info(
f"系统监控资源使用率: {usage_percent:.1f}% ({file_count}/{limits['max_user_watches']})")
self._observers.append(observer)
observer.schedule(FileMonitorHandler(mon_path=mon_path, callback=self),
path=str(mon_path),
recursive=True)
observer.daemon = True
observer.start()
mode_name = "兼容模式(轮询)" if use_polling else "快速模式"
logger.info(f"使用{mode_name}监控 {mon_path}")
if not use_polling:
if limits['warnings']:
for warning in limits['warnings']:
logger.warn(f"系统限制警告: {warning}")
if limits['max_user_watches'] > 0:
usage_percent = (file_count / limits['max_user_watches']) * 100
logger.info(
f"系统监控资源使用率: {usage_percent:.1f}% ({file_count}/{limits['max_user_watches']})")
watcher = LocalDirectoryWatcher(
mon_path=mon_path,
callback=self,
force_polling=True if use_polling else None
)
self._watchers.append(watcher)
watcher.start()
logger.info(f"✓ 本地目录监控已启动: {mon_path} [{mode_name}]")
except Exception as e:
@@ -521,64 +678,6 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
remote_count = len([d for d in monitor_dirs if d.storage != "local" and d.monitor_type == "monitor"])
logger.info(f"目录监控启动完成: 本地监控 {local_count} 个,远程监控 {remote_count}")
def __choose_observer(self) -> Optional[Any]:
"""
选择最优的监控模式(带错误处理和自动回退)
"""
system = platform.system()
observers_to_try = []
try:
if system == 'Linux':
observers_to_try = [
('InotifyObserver',
lambda: self.__try_import_observer('watchdog.observers.inotify', 'InotifyObserver')),
]
elif system == 'Darwin':
observers_to_try = [
('FSEventsObserver',
lambda: self.__try_import_observer('watchdog.observers.fsevents', 'FSEventsObserver')),
]
elif system == 'Windows':
observers_to_try = [
('WindowsApiObserver',
lambda: self.__try_import_observer('watchdog.observers.read_directory_changes',
'WindowsApiObserver')),
]
# 尝试每个观察者
for observer_name, observer_func in observers_to_try:
try:
observer_class = observer_func()
if observer_class:
# 尝试创建实例以验证是否可用
test_observer = observer_class()
test_observer.stop() # 立即停止测试实例
logger.debug(f"成功初始化 {observer_name}")
return observer_class()
except Exception as e:
logger.debug(f"初始化 {observer_name} 失败: {e}")
continue
except Exception as e:
logger.debug(f"选择观察者时出错: {e}")
logger.debug("所有快速监控模式都不可用,将使用兼容模式")
return None
@staticmethod
def __try_import_observer(module_name: str, class_name: str):
"""
尝试导入观察者类
"""
try:
module = __import__(module_name, fromlist=[class_name])
return getattr(module, class_name)
except (ImportError, AttributeError) as e:
logger.debug(f"导入 {module_name}.{class_name} 失败: {e}")
return None
def polling_observer(self, storage: str, mon_paths: List[Path]):
"""
轮询监控(改进版)
@@ -738,17 +837,19 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
"""
退出监控
"""
self._event.set()
if self._observers:
if self._watchers:
logger.info("正在停止本地目录监控服务...")
for observer in self._observers:
for watcher in self._watchers:
try:
observer.stop()
observer.join()
logger.debug(f"已停止监控服务: {observer}")
watcher.stop()
watcher.join(timeout=5)
if watcher.is_alive():
logger.warning(f"本地目录监控线程在5秒内未能停止: {watcher.watch_path}")
else:
logger.debug(f"已停止本地目录监控服务: {watcher.watch_path}")
except Exception as e:
logger.error(f"停止目录监控服务出现了错误:{e}")
self._observers = []
self._watchers = []
logger.info("本地目录监控服务已停止")
if self._scheduler:
self._scheduler.remove_all_jobs()
@@ -763,4 +864,3 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
self._cache.close()
if self._snapshot_cache:
self._snapshot_cache.close()
self._event.clear()

View File

@@ -48,7 +48,6 @@ starlette~=0.46.2
PyVirtualDisplay~=3.0
psutil~=7.0.0
python-dotenv~=1.1.1
watchdog~=6.0.0
watchfiles~=1.1.0
click~=8.2.1
parse~=1.20.2

View File

@@ -0,0 +1,139 @@
import tempfile
import unittest
from pathlib import Path
from unittest.mock import MagicMock
from watchfiles import Change
from app.monitor import DirectoryChangeEvent, LocalDirectoryWatcher, Monitor
class CallbackRecorder:
"""
测试用目录监控回调记录器。
"""
def __init__(self):
"""
初始化事件记录列表。
"""
self.events = []
def event_handler(self, event, text: str, event_path: str, file_size: int = None):
"""
记录目录监控分发出来的事件。
:param event: 目录监控事件
:param text: 事件描述
:param event_path: 事件路径
:param file_size: 文件大小
"""
self.events.append((event, text, event_path, file_size))
class LocalDirectoryWatcherTest(unittest.TestCase):
"""
watchfiles 本地目录监控测试。
"""
def test_handle_changes_dispatches_added_and_modified_files(self):
"""
新增和修改文件应转换成目录监控整理回调。
"""
with tempfile.TemporaryDirectory() as temp_dir:
watch_dir = Path(temp_dir)
added_file = watch_dir / "a_added.mkv"
modified_file = watch_dir / "b_modified.mkv"
skipped_dir = watch_dir / "c_dir"
added_file.write_bytes(b"added")
modified_file.write_bytes(b"modified")
skipped_dir.mkdir()
callback = CallbackRecorder()
watcher = LocalDirectoryWatcher(watch_dir, callback=callback, force_polling=True)
watcher._handle_changes({
(Change.added, added_file.as_posix()),
(Change.modified, modified_file.as_posix()),
(Change.deleted, added_file.as_posix()),
(Change.added, skipped_dir.as_posix()),
})
self.assertEqual(2, len(callback.events))
self.assertEqual((Change.added, "新增", added_file.as_posix(), 5),
(callback.events[0][0].change_type,
callback.events[0][1],
callback.events[0][2],
callback.events[0][3]))
self.assertEqual((Change.modified, "修改", modified_file.as_posix(), 8),
(callback.events[1][0].change_type,
callback.events[1][1],
callback.events[1][2],
callback.events[1][3]))
def test_handle_changes_skips_missing_paths(self):
"""
事件到达时已经消失的路径不应触发整理。
"""
with tempfile.TemporaryDirectory() as temp_dir:
watch_dir = Path(temp_dir)
missing_file = watch_dir / "missing.mkv"
callback = CallbackRecorder()
watcher = LocalDirectoryWatcher(watch_dir, callback=callback, force_polling=True)
watcher._handle_changes({(Change.added, missing_file.as_posix())})
self.assertEqual([], callback.events)
class MonitorWatchfilesEventTest(unittest.TestCase):
"""
Monitor 对 watchfiles 事件的兼容处理测试。
"""
def test_event_handler_routes_file_events_to_transfer_handler(self):
"""
文件事件应继续按 local 存储交给整理流程。
"""
monitor = object.__new__(Monitor)
handle_file = MagicMock()
setattr(monitor, "_Monitor__handle_file", handle_file)
event_path = Path("/downloads/movie.mkv")
event = DirectoryChangeEvent(
change_type=Change.added,
src_path=event_path.as_posix(),
is_directory=False
)
monitor.event_handler(
event=event,
text="新增",
event_path=event_path.as_posix(),
file_size=1024
)
handle_file.assert_called_once_with(
storage="local",
event_path=event_path,
file_size=1024
)
def test_event_handler_ignores_directory_events(self):
"""
目录事件不应进入文件整理流程。
"""
monitor = object.__new__(Monitor)
handle_file = MagicMock()
setattr(monitor, "_Monitor__handle_file", handle_file)
event_path = Path("/downloads/folder")
event = DirectoryChangeEvent(
change_type=Change.added,
src_path=event_path.as_posix(),
is_directory=True
)
monitor.event_handler(
event=event,
text="新增",
event_path=event_path.as_posix()
)
handle_file.assert_not_called()