Files
MoviePilot/tests/test_skill_scripts_security.py

144 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import importlib.util
import json
from pathlib import Path
from types import ModuleType
from typing import Any
PROJECT_ROOT = Path(__file__).resolve().parents[1]
MP_API_SCRIPT = PROJECT_ROOT / "skills" / "moviepilot-api" / "scripts" / "mp-api.py"
MP_DB_SCRIPT = PROJECT_ROOT / "skills" / "database-operation" / "scripts" / "mp-db.py"
def _load_script(path: Path, module_name: str) -> ModuleType:
"""按文件路径加载 skill 脚本模块。"""
spec = importlib.util.spec_from_file_location(module_name, path)
module = importlib.util.module_from_spec(spec)
assert spec and spec.loader
spec.loader.exec_module(module)
return module
def test_mp_api_uses_settings_without_prompt_token(monkeypatch, tmp_path) -> None:
"""API 脚本应直接读取 settings而不是要求提示词提供 token。"""
module = _load_script(MP_API_SCRIPT, "mp_api_script")
runtime_dir = tmp_path / "temp"
runtime_dir.mkdir()
class FakeSettings:
"""提供 API 脚本本地配置所需字段。"""
TEMP_PATH = runtime_dir
HOST = "0.0.0.0"
PORT = 3001
API_TOKEN = "settings-token"
monkeypatch.setattr(module, "_ensure_project_import", lambda: None)
monkeypatch.setattr(module, "read_config", lambda: ("http://file-host", "file-token"))
monkeypatch.setattr(
"app.core.config.settings",
FakeSettings,
raising=False,
)
monkeypatch.delenv("MP_HOST", raising=False)
monkeypatch.delenv("MP_API_KEY", raising=False)
host, key = module.resolve_config()
assert host == "http://127.0.0.1:3001"
assert key == "settings-token"
def test_mp_db_rejects_write_statement_without_write_flag() -> None:
"""数据库脚本默认必须拒绝写操作。"""
module = _load_script(MP_DB_SCRIPT, "mp_db_script")
assert module._is_supported_statement("SELECT COUNT(*) FROM downloadhistory", False)
assert not module._is_supported_statement("UPDATE subscribe SET state='S' WHERE id=1", False)
assert not module._is_supported_statement(
"SELECT COUNT(*) FROM downloadhistory; DELETE FROM downloadhistory WHERE id=1",
False,
)
def test_mp_db_returns_sensitive_columns_without_masking(monkeypatch, capsys) -> None:
"""数据库脚本应原样返回敏感字段供 Agent 内部使用。"""
module = _load_script(MP_DB_SCRIPT, "mp_db_script_unmasked")
class FakeRow:
"""模拟 SQLAlchemy 查询行。"""
_mapping = {"token": "raw-token", "password": "raw-password"}
class FakeResult:
"""模拟返回查询结果的 SQLAlchemy Result。"""
returns_rows = True
def fetchall(self) -> list[FakeRow]:
"""返回测试查询行。"""
return [FakeRow()]
class FakeConnection:
"""模拟数据库连接。"""
def execute(self, statement: Any) -> FakeResult:
"""返回测试结果。"""
return FakeResult()
class FakeTransaction:
"""模拟 engine.begin() 上下文。"""
def __enter__(self) -> FakeConnection:
"""进入上下文时返回连接。"""
return FakeConnection()
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> bool:
"""退出上下文。"""
return False
class FakeEngine:
"""模拟数据库引擎。"""
def begin(self) -> FakeTransaction:
"""返回事务上下文。"""
return FakeTransaction()
monkeypatch.setattr(module, "_build_engine", lambda: FakeEngine())
assert module.run_query("SELECT token, password FROM site LIMIT 1") == 0
output = json.loads(capsys.readouterr().out)
assert output["rows"] == [{"token": "raw-token", "password": "raw-password"}]
def test_mp_db_write_command_allows_write_statement(monkeypatch) -> None:
"""数据库脚本 write 子命令应直接允许写操作。"""
module = _load_script(MP_DB_SCRIPT, "mp_db_script_write")
calls = []
def fake_run_query(sql: str, *, limit: int = 100, allow_write: bool = False) -> int:
"""记录 write 子命令传入的执行参数。"""
calls.append({"sql": sql, "limit": limit, "allow_write": allow_write})
return 0
monkeypatch.setattr(module, "run_query", fake_run_query)
monkeypatch.setattr(
module.sys,
"argv",
[
"mp-db.py",
"write",
"UPDATE subscribe SET state = 'S' WHERE id = 123",
],
)
assert module.main() == 0
assert calls == [
{
"sql": "UPDATE subscribe SET state = 'S' WHERE id = 123",
"limit": 0,
"allow_write": True,
}
]