mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-06-20 23:14:32 +08:00
318 lines
11 KiB
Python
318 lines
11 KiB
Python
"""Agent 资源流程工具权限测试。"""
|
|
|
|
import asyncio
|
|
import json
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, patch
|
|
|
|
from app.agent.tools.impl.edit_file import EditFileTool
|
|
from app.agent.tools.impl.list_directory import ListDirectoryTool
|
|
from app.agent.tools.impl.query_downloaders import QueryDownloadersTool
|
|
from app.agent.tools.impl.query_sites import QuerySitesTool
|
|
from app.agent.tools.impl.read_file import ReadFileTool
|
|
from app.agent.tools.impl.send_local_file import SendLocalFileTool
|
|
from app.agent.tools.impl.write_file import WriteFileTool
|
|
from app.agent.tools.manager import MoviePilotToolsManager
|
|
from app.agent import MoviePilotAgent
|
|
from app.core.config import settings
|
|
from app.schemas.types import MessageChannel
|
|
|
|
|
|
def test_non_admin_manager_exposes_resource_flow_helper_tools():
|
|
"""普通用户应能看到搜索、订阅、下载流程所需的辅助工具。"""
|
|
site_tool = QuerySitesTool(session_id="session-1", user_id="10001")
|
|
downloader_tool = QueryDownloadersTool(session_id="session-1", user_id="10001")
|
|
|
|
with patch(
|
|
"app.agent.tools.manager.MoviePilotToolFactory.create_tools",
|
|
return_value=[site_tool, downloader_tool],
|
|
):
|
|
manager = MoviePilotToolsManager(is_admin=False)
|
|
|
|
tool_names = {tool.name for tool in manager.list_tools()}
|
|
assert "query_sites" in tool_names
|
|
assert "query_downloaders" in tool_names
|
|
|
|
|
|
def test_non_admin_manager_exposes_restricted_file_tools():
|
|
"""普通用户应能看到受目录边界限制的文件读写工具。"""
|
|
tools = [
|
|
ReadFileTool(session_id="session-1", user_id="10001"),
|
|
WriteFileTool(session_id="session-1", user_id="10001"),
|
|
EditFileTool(session_id="session-1", user_id="10001"),
|
|
ListDirectoryTool(session_id="session-1", user_id="10001"),
|
|
]
|
|
|
|
with patch(
|
|
"app.agent.tools.manager.MoviePilotToolFactory.create_tools",
|
|
return_value=tools,
|
|
):
|
|
manager = MoviePilotToolsManager(is_admin=False)
|
|
|
|
tool_names = {tool.name for tool in manager.list_tools()}
|
|
assert {"read_file", "write_file", "edit_file", "list_directory"} <= tool_names
|
|
|
|
|
|
def test_query_sites_hides_only_sensitive_fields_for_non_admin_user():
|
|
"""普通用户查询站点时只隐藏 Cookie、API Key、Token 和 RSS。"""
|
|
tool = QuerySitesTool(session_id="session-1", user_id="10001")
|
|
site = SimpleNamespace(
|
|
id=1,
|
|
name="TestSite",
|
|
domain="secret.example",
|
|
url="https://secret.example/",
|
|
pri=1,
|
|
rss="https://secret.example/rss",
|
|
cookie="uid=1; passkey=secret",
|
|
ua="SecretUA",
|
|
apikey="site-api-key",
|
|
token="site-token",
|
|
proxy=1,
|
|
filter="",
|
|
render=0,
|
|
public=0,
|
|
note={"secret": True},
|
|
limit_interval=0,
|
|
limit_count=0,
|
|
limit_seconds=0,
|
|
timeout=15,
|
|
is_active=True,
|
|
downloader="qb",
|
|
)
|
|
|
|
with patch(
|
|
"app.agent.tools.impl.query_sites.SiteOper"
|
|
) as site_oper:
|
|
site_oper.return_value.async_list = AsyncMock(return_value=[site])
|
|
result = asyncio.run(tool.run())
|
|
|
|
payload = json.loads(result)
|
|
assert payload == [
|
|
{
|
|
"id": 1,
|
|
"name": "TestSite",
|
|
"domain": "secret.example",
|
|
"url": "https://secret.example/",
|
|
"pri": 1,
|
|
"is_active": True,
|
|
"downloader": "qb",
|
|
"ua": "SecretUA",
|
|
"proxy": 1,
|
|
"filter": "",
|
|
"render": 0,
|
|
"public": 0,
|
|
"note": {"secret": True},
|
|
"limit_interval": 0,
|
|
"limit_count": 0,
|
|
"limit_seconds": 0,
|
|
"timeout": 15,
|
|
}
|
|
]
|
|
assert "cookie" not in payload[0]
|
|
assert "rss" not in payload[0]
|
|
assert "token" not in payload[0]
|
|
assert "apikey" not in payload[0]
|
|
|
|
|
|
def test_query_sites_keeps_full_fields_for_admin_context():
|
|
"""管理员查询站点时保留完整配置视图。"""
|
|
tool = QuerySitesTool(session_id="session-1", user_id="admin")
|
|
tool.set_agent_context({"is_admin": True})
|
|
site = SimpleNamespace(
|
|
id=1,
|
|
name="TestSite",
|
|
domain="secret.example",
|
|
url="https://secret.example/",
|
|
pri=1,
|
|
rss="https://secret.example/rss",
|
|
cookie="uid=1; passkey=secret",
|
|
ua="SecretUA",
|
|
apikey="site-api-key",
|
|
token="site-token",
|
|
proxy=1,
|
|
filter="",
|
|
render=0,
|
|
public=0,
|
|
note={"secret": True},
|
|
limit_interval=0,
|
|
limit_count=0,
|
|
limit_seconds=0,
|
|
timeout=15,
|
|
is_active=True,
|
|
downloader="qb",
|
|
)
|
|
|
|
with patch(
|
|
"app.agent.tools.impl.query_sites.SiteOper"
|
|
) as site_oper:
|
|
site_oper.return_value.async_list = AsyncMock(return_value=[site])
|
|
result = asyncio.run(tool.run())
|
|
|
|
payload = json.loads(result)
|
|
assert payload[0]["cookie"] == "uid=1; passkey=secret"
|
|
assert payload[0]["token"] == "site-token"
|
|
assert payload[0]["apikey"] == "site-api-key"
|
|
assert payload[0]["url"] == "https://secret.example/"
|
|
|
|
|
|
def test_non_admin_file_tools_can_access_config_directory(tmp_path, monkeypatch):
|
|
"""普通用户可在配置目录内读写和编辑文件。"""
|
|
config_path = tmp_path / "config"
|
|
monkeypatch.setattr(settings, "CONFIG_DIR", str(config_path))
|
|
memory_path = settings.CONFIG_PATH / "agent" / "memory" / "MEMORY.md"
|
|
|
|
write_tool = WriteFileTool(session_id="session-1", user_id="10001")
|
|
read_tool = ReadFileTool(session_id="session-1", user_id="10001")
|
|
edit_tool = EditFileTool(session_id="session-1", user_id="10001")
|
|
|
|
write_result = asyncio.run(write_tool.run(str(memory_path), "hello"))
|
|
read_result = asyncio.run(read_tool.run(str(memory_path)))
|
|
edit_result = asyncio.run(edit_tool.run(str(memory_path), "hello", "hello mp"))
|
|
edited_content = memory_path.read_text(encoding="utf-8")
|
|
|
|
assert "成功写入文件" in write_result
|
|
assert read_result == "hello"
|
|
assert "成功编辑文件" in edit_result
|
|
assert edited_content == "hello mp"
|
|
|
|
|
|
def test_non_admin_file_tools_block_paths_outside_allowed_roots(
|
|
tmp_path, monkeypatch
|
|
):
|
|
"""普通用户不能通过文件工具访问配置、记忆和日志目录外的路径。"""
|
|
config_path = tmp_path / "config"
|
|
outside_path = tmp_path / "outside.txt"
|
|
outside_path.write_text("secret", encoding="utf-8")
|
|
monkeypatch.setattr(settings, "CONFIG_DIR", str(config_path))
|
|
|
|
read_tool = ReadFileTool(session_id="session-1", user_id="10001")
|
|
write_tool = WriteFileTool(session_id="session-1", user_id="10001")
|
|
edit_tool = EditFileTool(session_id="session-1", user_id="10001")
|
|
list_tool = ListDirectoryTool(session_id="session-1", user_id="10001")
|
|
send_tool = SendLocalFileTool(session_id="session-1", user_id="10001")
|
|
send_tool.set_message_attr(
|
|
channel=MessageChannel.Telegram.value,
|
|
source="telegram-main",
|
|
username="normal-user",
|
|
)
|
|
|
|
read_result = asyncio.run(read_tool.run(str(outside_path)))
|
|
write_result = asyncio.run(write_tool.run(str(outside_path), "changed"))
|
|
edit_result = asyncio.run(edit_tool.run(str(outside_path), "secret", "changed"))
|
|
with patch.object(ListDirectoryTool, "_list_directory_sync") as list_directory:
|
|
list_result = asyncio.run(list_tool.run(str(tmp_path)))
|
|
send_result = asyncio.run(send_tool.run(str(outside_path)))
|
|
|
|
assert "普通用户只能读取" in read_result
|
|
assert "普通用户只能写入" in write_result
|
|
assert "普通用户只能编辑" in edit_result
|
|
assert "普通用户只能列出" in list_result
|
|
assert "普通用户只能发送" in send_result
|
|
assert outside_path.read_text(encoding="utf-8") == "secret"
|
|
list_directory.assert_not_called()
|
|
|
|
|
|
def test_admin_file_tool_can_access_paths_outside_allowed_roots(
|
|
tmp_path, monkeypatch
|
|
):
|
|
"""管理员上下文不受普通用户文件访问边界限制。"""
|
|
config_path = tmp_path / "config"
|
|
outside_path = tmp_path / "outside.txt"
|
|
monkeypatch.setattr(settings, "CONFIG_DIR", str(config_path))
|
|
|
|
tool = WriteFileTool(session_id="session-1", user_id="admin")
|
|
tool.set_agent_context({"is_admin": True})
|
|
|
|
result = asyncio.run(tool.run(str(outside_path), "admin write"))
|
|
|
|
assert "成功写入文件" in result
|
|
assert outside_path.read_text(encoding="utf-8") == "admin write"
|
|
|
|
|
|
def test_query_downloaders_hides_sensitive_fields_for_non_admin_user():
|
|
"""普通用户查询下载器时只返回选择下载器所需的安全字段。"""
|
|
tool = QueryDownloadersTool(session_id="session-1", user_id="10001")
|
|
downloaders = [
|
|
{
|
|
"name": "qb",
|
|
"type": "qbittorrent",
|
|
"enabled": True,
|
|
"host": "http://127.0.0.1",
|
|
"port": 8080,
|
|
"username": "admin",
|
|
"password": "secret",
|
|
"apikey": "downloader-api-key",
|
|
"token": "downloader-token",
|
|
}
|
|
]
|
|
|
|
with patch(
|
|
"app.agent.tools.impl.query_downloaders.SystemConfigOper"
|
|
) as system_config_oper:
|
|
system_config_oper.return_value.get.return_value = downloaders
|
|
result = asyncio.run(tool.run())
|
|
|
|
payload = json.loads(result)
|
|
assert payload == [
|
|
{
|
|
"name": "qb",
|
|
"type": "qbittorrent",
|
|
"enabled": True,
|
|
}
|
|
]
|
|
assert "host" not in payload[0]
|
|
assert "username" not in payload[0]
|
|
assert "password" not in payload[0]
|
|
assert "apikey" not in payload[0]
|
|
assert "token" not in payload[0]
|
|
|
|
|
|
def test_query_downloaders_keeps_full_fields_for_admin_context():
|
|
"""管理员查询下载器时保留完整配置视图。"""
|
|
tool = QueryDownloadersTool(session_id="session-1", user_id="admin")
|
|
tool.set_agent_context({"is_admin": True})
|
|
downloaders = [
|
|
{
|
|
"name": "qb",
|
|
"type": "qbittorrent",
|
|
"enabled": True,
|
|
"host": "http://127.0.0.1",
|
|
"username": "admin",
|
|
"password": "secret",
|
|
"apikey": "downloader-api-key",
|
|
}
|
|
]
|
|
|
|
with patch(
|
|
"app.agent.tools.impl.query_downloaders.SystemConfigOper"
|
|
) as system_config_oper:
|
|
system_config_oper.return_value.get.return_value = downloaders
|
|
result = asyncio.run(tool.run())
|
|
|
|
payload = json.loads(result)
|
|
assert payload[0]["host"] == "http://127.0.0.1"
|
|
assert payload[0]["username"] == "admin"
|
|
assert payload[0]["password"] == "secret"
|
|
assert payload[0]["apikey"] == "downloader-api-key"
|
|
|
|
|
|
def test_channel_agent_admin_user_id_does_not_bypass_user_lookup():
|
|
"""渠道用户 ID 恰好为 admin 时,不应绕过真实系统用户权限判断。"""
|
|
agent = MoviePilotAgent(
|
|
session_id="session-1",
|
|
user_id="admin",
|
|
channel=MessageChannel.Telegram.value,
|
|
source="telegram-main",
|
|
username="normal-user",
|
|
)
|
|
|
|
with patch("app.agent.UserOper") as user_oper:
|
|
user_oper.return_value.async_get_by_name.return_value = SimpleNamespace(
|
|
is_superuser=False
|
|
)
|
|
context = asyncio.run(
|
|
agent._build_tool_context(should_dispatch_reply=True)
|
|
)
|
|
|
|
assert context["is_admin"] is False
|