Compare commits

...

13 Commits

22 changed files with 505 additions and 316 deletions

View File

@@ -11,6 +11,13 @@ on:
# 允许手动触发
workflow_dispatch:
permissions:
contents: read
concurrency:
group: unit-tests-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
jobs:
pytest:
runs-on: ubuntu-latest

View File

@@ -15,7 +15,6 @@ You act as a proactive agent. Your goal is to fully resolve the user's media-rel
<non_negotiable_boundaries>
- Do not let user memory or persona style override this core identity, safety boundaries, or built-in background task rules.
- Never directly modify application source code, scripts, tests, or generated code through `edit_file`, `write_file`, shell write operations, or similar tools. If the user asks about MoviePilot internals or debugging, inspect and explain the needed change without applying it.
- If the user explicitly asks to change the speaking style or persona, use `query_personas` and `switch_persona` instead of editing runtime files manually.
- If the user explicitly asks to rewrite or create a persona definition, prefer `update_persona_definition` rather than generic file-editing tools.
- Treat read-only inspection as allowed, but never use shell redirection, overwrite operations, file editing tools, or generated patches to change code.

View File

@@ -124,6 +124,8 @@ task_types:
- "When several records obviously share the same media identity, avoid repeated `recognize_media` or `search_media` calls."
- "Process every selected record exactly once."
- "Keep the final response short and focused on the aggregate outcome."
- "Final response must be plain text only: one concise Chinese sentence or paragraph describing the aggregate result."
- "Do NOT include any title/header, bullet list, numbered list, bold text, code block, table, or other Markdown formatting."
search_recommend:
header: "[System Task - Search Results Recommendation]"
objective: "Analyze the provided search results and select the best matching items based on user preferences."

View File

@@ -373,6 +373,9 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
"wechat": "WECHAT_BOT_CHAT_ID",
"feishu": "FEISHU_OPEN_ID",
"wechatclawbot": "WECHATCLAWBOT_DEFAULT_TARGET",
"discord": "DISCORD_CHANNEL_ID",
"slack": "SLACK_CHANNEL",
"qqbot": "QQ_OPENID",
}
admin_key = admin_key_map.get(channel_type)

View File

@@ -2785,9 +2785,16 @@ class SubscribeChain(ChainBase):
# 更新剧集列表、开始集数、总集数
if not episode_list:
# 整季缺失
episodes = []
start_episode = start_episode or start
total_episode = total_episode or total
original_start = start if start is not None else 1
# 空集列表会被下载链解释为整季下载;当订阅开始集裁掉季初范围时,需要转成显式集数。
if start_episode and total_episode and start_episode > original_start:
episodes = list(range(start_episode, total_episode + 1))
if not episodes:
return True, {}
else:
episodes = []
else:
# 部分缺失
if not start_episode \

View File

@@ -9,10 +9,10 @@ import sys
import threading
from asyncio import AbstractEventLoop
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Type
from typing import Any, Dict, List, Optional, Tuple, Type, Union, get_origin, get_args
from urllib.parse import quote, urlencode, urlparse
from dotenv import set_key
from dotenv import set_key, unset_key
from pydantic import BaseModel, Field, ConfigDict, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -690,6 +690,18 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
if isinstance(value, str):
value = value.strip()
# 处理 Optional 类型:当值为空字符串且类型允许 None 时,转为 None
# 兼容 typing.Union (Python 3.9) 与 types.UnionType (Python 3.10+ PEP 604)
origin = get_origin(expected_type)
is_union = origin is Union or getattr(origin, "__name__", None) == "UnionType"
if (
is_union
and type(None) in get_args(expected_type)
and isinstance(value, str)
and not value
):
return default, str(default) != str(original_value)
try:
if expected_type is bool:
if isinstance(value, bool):
@@ -812,13 +824,19 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
logger.warning(message)
return False, message
else:
# 当值为 None 时,从 env 文件中删除该键,恢复为默认值
if converted_value is None:
unset_key(
dotenv_path=SystemUtils.get_env_path(),
key_to_unset=field_name,
)
logger.info(f"配置项 '{field_name}' 已清空,从 'app.env' 中移除")
return True, message
# 如果是列表、字典或集合类型将其转换为JSON字符串
if isinstance(converted_value, (list, dict, set)):
value_to_write = json.dumps(converted_value)
else:
value_to_write = (
str(converted_value) if converted_value is not None else ""
)
value_to_write = str(converted_value)
set_key(
dotenv_path=SystemUtils.get_env_path(),
@@ -967,7 +985,7 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
@property
def PROXY(self):
if self.PROXY_HOST:
if self.PROXY_HOST and self.PROXY_HOST.strip():
return {
"http": self.PROXY_HOST,
"https": self.PROXY_HOST,
@@ -1009,7 +1027,7 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
@property
def PROXY_SERVER(self):
if self.PROXY_HOST:
if self.PROXY_HOST and self.PROXY_HOST.strip():
try:
parsed = urlparse(self.PROXY_HOST)
if not parsed.scheme:

View File

@@ -1,5 +1,5 @@
from abc import ABCMeta, abstractmethod
from pathlib import Path
from pathlib import Path, PurePosixPath
from typing import Optional, List, Dict, Tuple, Callable, Union
from tqdm import tqdm
@@ -105,6 +105,38 @@ class StorageBase(metaclass=ABCMeta):
self.storagehelper.reset_storage(self.schema.value)
self.init_storage()
@staticmethod
def _safe_download_name(name: Optional[str]) -> Optional[str]:
"""
提取可安全落盘的文件名。
"""
if not name:
return None
safe_name = PurePosixPath(str(name).replace("\\", "/")).name
if safe_name in ("", ".", ".."):
return None
return safe_name
def _build_download_path(
self, fileitem: schemas.FileItem, path: Path
) -> Optional[Path]:
"""
构造本地下载路径,避免远端文件名携带目录片段时越过目标目录。
"""
safe_name = self._safe_download_name(fileitem.name)
if not safe_name:
logger.error(f"【存储】下载文件名无效:{fileitem.name}")
return None
local_path = path / safe_name
try:
local_path.resolve().relative_to(path.resolve())
except ValueError:
logger.error(f"【存储】下载路径越界:{fileitem.name} -> {local_path}")
return None
return local_path
@abstractmethod
def check(self) -> bool:
"""

View File

@@ -741,7 +741,9 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
logger.error(f"【阿里云盘】下载链接为空: {fileitem.name}")
return None
local_path = (path or settings.TEMP_PATH) / fileitem.name
local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH)
if not local_path:
return None
# 获取文件大小
file_size = fileitem.size

View File

@@ -340,7 +340,9 @@ class Rclone(StorageBase):
"""
带实时进度显示的下载
"""
local_path = (path or settings.TEMP_PATH) / fileitem.name
local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH)
if not local_path:
return None
# 初始化进度条
logger.info(f"【rclone】开始下载: {fileitem.name} -> {local_path}")

View File

@@ -511,7 +511,9 @@ class SMB(StorageBase, metaclass=WeakSingleton):
"""
带实时进度显示的下载
"""
local_path = (path or settings.TEMP_PATH) / fileitem.name
local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH)
if not local_path:
return None
smb_path = self._normalize_path(fileitem.path)
try:
self._check_connection()

View File

@@ -830,7 +830,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
logger.error(f"【115】下载链接为空: {fileitem.name}")
return None
local_path = (path or settings.TEMP_PATH) / fileitem.name
local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH)
if not local_path:
return None
# 获取文件大小
file_size = detail.size

View File

@@ -100,6 +100,7 @@ def send_proactive_c2c_message(
openid: str,
content: str,
use_markdown: bool = False,
keyboard: Optional[dict] = None,
) -> dict:
"""
主动发送 C2C 单聊消息(不需要 msg_id
@@ -108,11 +109,14 @@ def send_proactive_c2c_message(
:param openid: 用户 openid
:param content: 消息内容
:param use_markdown: 是否使用 Markdown 格式(需机器人开通 Markdown 能力)
:param keyboard: 键盘按钮配置
"""
if not content or not content.strip():
raise ValueError("主动消息内容不能为空")
content = content.strip()
body = {"markdown": {"content": content}, "msg_type": 2} if use_markdown else {"content": content, "msg_type": 0}
if keyboard:
body["keyboard"] = {"content": keyboard}
return _api_request(
access_token, "POST", f"/v2/users/{openid}/messages", body
)
@@ -123,6 +127,7 @@ def send_proactive_group_message(
group_openid: str,
content: str,
use_markdown: bool = False,
keyboard: Optional[dict] = None,
) -> dict:
"""
主动发送群聊消息(不需要 msg_id
@@ -131,11 +136,14 @@ def send_proactive_group_message(
:param group_openid: 群聊 openid
:param content: 消息内容
:param use_markdown: 是否使用 Markdown 格式(需机器人开通 Markdown 能力)
:param keyboard: 键盘按钮配置
"""
if not content or not content.strip():
raise ValueError("主动消息内容不能为空")
content = content.strip()
body = {"markdown": {"content": content}, "msg_type": 2} if use_markdown else {"content": content, "msg_type": 0}
if keyboard:
body["keyboard"] = {"content": keyboard}
return _api_request(
access_token, "POST", f"/v2/groups/{group_openid}/messages", body
)
@@ -146,11 +154,14 @@ def send_c2c_message(
openid: str,
content: str,
msg_id: Optional[str] = None,
keyboard: Optional[dict] = None,
) -> dict:
"""被动回复 C2C 单聊消息1 小时内最多 4 次)"""
body = {"content": content, "msg_type": 0, "msg_seq": 1}
if msg_id:
body["msg_id"] = msg_id
if keyboard:
body["keyboard"] = {"content": keyboard}
return _api_request(
access_token, "POST", f"/v2/users/{openid}/messages", body
)
@@ -161,11 +172,14 @@ def send_group_message(
group_openid: str,
content: str,
msg_id: Optional[str] = None,
keyboard: Optional[dict] = None,
) -> dict:
"""被动回复群聊消息1 小时内最多 4 次)"""
body = {"content": content, "msg_type": 0, "msg_seq": 1}
if msg_id:
body["msg_id"] = msg_id
if keyboard:
body["keyboard"] = {"content": keyboard}
return _api_request(
access_token, "POST", f"/v2/groups/{group_openid}/messages", body
)
@@ -188,6 +202,7 @@ def send_message(
content: str,
msg_type: Literal["c2c", "group"] = "c2c",
msg_id: Optional[str] = None,
keyboard: Optional[dict] = None,
) -> dict:
"""
统一发送接口
@@ -196,11 +211,12 @@ def send_message(
:param content: 消息内容
:param msg_type: c2c 单聊 / group 群聊
:param msg_id: 可选,被动回复时传入原消息 id
:param keyboard: 可选,键盘按钮配置
"""
if msg_id:
if msg_type == "c2c":
return send_c2c_message(access_token, target, content, msg_id)
return send_group_message(access_token, target, content, msg_id)
return send_c2c_message(access_token, target, content, msg_id, keyboard)
return send_group_message(access_token, target, content, msg_id, keyboard)
if msg_type == "c2c":
return send_proactive_c2c_message(access_token, target, content)
return send_proactive_group_message(access_token, target, content)
return send_proactive_c2c_message(access_token, target, content, keyboard=keyboard)
return send_proactive_group_message(access_token, target, content, keyboard=keyboard)

View File

@@ -4,6 +4,7 @@ QQ Bot Gateway WebSocket 客户端
"""
import json
import re
import threading
import time
from typing import Callable, List, Optional
@@ -108,6 +109,9 @@ def run_gateway(
author = d.get("author", {})
user_openid = author.get("user_openid", "")
content = d.get("content", "").strip()
match = re.search(r'(agent_interaction:choice:[\w\-]+:\d+|agent_choice:[\w\-]+:\d+)', content)
if match:
content = f"CALLBACK:{match.group(1)}"
msg_id = d.get("id", "")
if content:
on_message_fn({
@@ -122,6 +126,9 @@ def run_gateway(
member_openid = author.get("member_openid", "")
group_openid = d.get("group_openid", "")
content = d.get("content", "").strip()
match = re.search(r'(agent_interaction:choice:[\w\-]+:\d+|agent_choice:[\w\-]+:\d+)', content)
if match:
content = f"CALLBACK:{match.group(1)}"
msg_id = d.get("id", "")
if content:
on_message_fn({

View File

@@ -347,13 +347,43 @@ class QQBot:
logger.warn("QQ Bot: 消息内容为空")
return False
# 处理按钮
buttons = kwargs.get("buttons")
keyboard = None
if buttons:
rows = []
btn_id = 1
for row in buttons:
btns = []
for btn in row:
action_type = 0 if btn.get("url") else 2
btns.append({
"id": str(btn_id),
"render_data": {
"label": btn.get("text", "按钮")[:30],
"visited_label": btn.get("text", "按钮")[:30],
"style": 1
},
"action": {
"type": action_type,
"data": btn.get("url") if action_type == 0 else btn.get("callback_data", ""),
"permission": {"type": 2}
}
})
btn_id += 1
if btns:
rows.append({"buttons": btns})
if rows:
keyboard = {"rows": rows}
use_markdown = True
success_count = 0
try:
token = get_access_token(self._app_id, self._app_secret)
for tgt, tgt_is_group in targets_to_send:
send_fn = send_proactive_group_message if tgt_is_group else send_proactive_c2c_message
try:
send_fn(token, tgt, content, use_markdown=use_markdown)
send_fn(token, tgt, content, use_markdown=use_markdown, keyboard=keyboard)
success_count += 1
logger.debug(f"QQ Bot: 消息已发送到 {'' if tgt_is_group else '用户'} {tgt}")
except Exception as e:
@@ -371,7 +401,7 @@ class QQBot:
plain_parts.append(link)
plain_content = "\n".join(plain_parts).strip()
if plain_content:
send_fn(token, tgt, plain_content, use_markdown=False)
send_fn(token, tgt, plain_content, use_markdown=False, keyboard=None)
success_count += 1
logger.debug(f"QQ Bot: Markdown 不可用,已回退纯文本发送至 {tgt}")
else:

View File

@@ -450,7 +450,12 @@ class ChannelCapabilityManager:
ChannelCapability.RICH_TEXT,
ChannelCapability.IMAGES,
ChannelCapability.LINKS,
ChannelCapability.INLINE_BUTTONS,
ChannelCapability.CALLBACK_QUERIES,
},
max_buttons_per_row=5,
max_button_rows=5,
max_button_text_length=30,
fallback_enabled=True,
),
}

View File

@@ -92,6 +92,9 @@ def _get_shared_async_transport(
会话级状态由调用方在外层 AsyncClient(transport=...) 实例化时单独配置,
每次调用用完即销毁,因此天然无 jar 累积串扰。
"""
# 规范化代理:拒绝空字符串等非法值,防止 httpx 抛出 Unknown scheme for proxy URL
if proxy is not None and (not proxy or not proxy.strip()):
proxy = None
try:
loop = asyncio.get_running_loop()
except RuntimeError:
@@ -899,12 +902,17 @@ class AsyncRequestUtils:
# 如果已经是字符串格式,直接返回
if isinstance(proxies, str):
return proxies
return proxies.strip() or None
# 如果是字典格式提取http或https代理
if isinstance(proxies, dict):
# 优先使用https代理如果没有则使用http代理
proxy_url = proxies.get("https") or proxies.get("http")
# 先各自 strip避免空白字符串阻断裂合取或回退到 http 代理
https_proxy = proxies.get("https")
http_proxy = proxies.get("http")
https_proxy = https_proxy.strip() if isinstance(https_proxy, str) else None
http_proxy = http_proxy.strip() if isinstance(http_proxy, str) else None
proxy_url = https_proxy or http_proxy
if proxy_url:
return proxy_url

View File

@@ -41,7 +41,6 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
iproute2 \
netcat-openbsd \
lsof \
ffmpeg \
nano \
libjemalloc2 \
&& dpkg-reconfigure --frontend noninteractive tzdata \
@@ -114,6 +113,10 @@ FROM prepare_package AS final
ENV LD_PRELOAD="/usr/local/lib/libjemalloc.so"
# 引入支持 amr 编码的静态 ffmpeg
COPY --from=mwader/static-ffmpeg:8.1.1 /ffmpeg /usr/local/bin/
COPY --from=mwader/static-ffmpeg:8.1.1 /ffprobe /usr/local/bin/
# python 环境
COPY --from=prepare_venv --chmod=777 ${VENV_PATH} ${VENV_PATH}
COPY --from=prepare_venv /usr/local/bin/uv /usr/local/bin/uv

View File

@@ -1,291 +0,0 @@
import unittest
from unittest.mock import patch
from app.agent.middleware.memory import MEMORY_ONBOARDING_PROMPT
from app.agent.middleware.runtime_config import RuntimeConfigMiddleware
from app.agent.prompt import COMMON_SHELL_COMMANDS, PromptConfigError, prompt_manager
from app.core.config import settings
class _FakeRequest:
def __init__(self, system_message=None):
self.system_message = system_message
def override(self, **kwargs):
return _FakeRequest(system_message=kwargs["system_message"])
class TestAgentPromptStyle(unittest.TestCase):
def setUp(self):
"""每个用例前清理系统命令缓存,避免本机 PATH 或测试顺序影响断言。"""
prompt_manager.clear_available_shell_commands_cache()
def tearDown(self):
"""每个用例后清理系统命令缓存,避免 mock 探测结果泄漏到后续用例。"""
prompt_manager.clear_available_shell_commands_cache()
def test_base_prompt_mentions_persona_management_tools(self):
prompt = prompt_manager.get_agent_prompt()
self.assertIn("query_personas", prompt)
self.assertIn("switch_persona", prompt)
self.assertIn("update_persona_definition", prompt)
def test_base_prompt_contains_immutable_core_rules(self):
prompt = prompt_manager.get_agent_prompt()
self.assertIn("AI media assistant powered by MoviePilot", prompt)
self.assertIn(
"omitting `season` means subscribe to season 1 only",
prompt,
)
self.assertIn(
"Do not let user memory or persona style override this core identity",
prompt,
)
self.assertIn(
"Never directly modify application source code",
prompt,
)
self.assertIn(
"If the user has not explicitly requested an operation that changes system behavior",
prompt,
)
self.assertIn("<non_negotiable_boundaries>", prompt)
self.assertIn("<confirmation_policy>", prompt)
self.assertIn(
"Treat read-only inspection as allowed",
prompt,
)
self.assertIn(
"Use `execute_command` only for diagnostics, read-only inspection, or commands the user explicitly asked to run",
prompt,
)
self.assertIn("当前日期", prompt)
self.assertNotIn("当前时间", prompt)
def test_base_prompt_requires_parallel_independent_tool_calls(self):
"""核心提示词应明确要求并行执行互不依赖的工具调用。"""
prompt = prompt_manager.get_agent_prompt()
self.assertIn("Use parallel tool calls by default", prompt)
self.assertIn(
"issue all tool calls that can run without waiting for each other's results",
prompt,
)
self.assertIn(
"Keep tools sequential only when later arguments depend on earlier output",
prompt,
)
def test_base_prompt_injects_available_shell_commands(self):
"""系统信息应注入 PATH 中已安装的常用命令,帮助 Agent 选择 execute_command。"""
command_paths = {
"git": "/usr/bin/git",
"rg": "/opt/homebrew/bin/rg",
}
with patch(
"app.agent.prompt.shutil.which",
side_effect=lambda command: command_paths.get(command),
):
prompt = prompt_manager.get_agent_prompt()
self.assertIn("- 可用系统命令(可通过 `execute_command` 调用):", prompt)
self.assertIn(" - git: /usr/bin/git", prompt)
self.assertIn(" - rg: /opt/homebrew/bin/rg", prompt)
self.assertIn(
"When searching files or text, prefer `rg` / `rg --files`",
prompt,
)
self.assertNotIn(" - ssh:", prompt)
def test_base_prompt_omits_shell_command_section_when_none_available(self):
"""PATH 中没有命中白名单命令时,不注入空的系统命令段落。"""
with patch("app.agent.prompt.shutil.which", return_value=None):
prompt = prompt_manager.get_agent_prompt()
self.assertNotIn("可用系统命令", prompt)
def test_available_shell_commands_are_cached_after_first_scan(self):
"""常用命令探测应只在首次加载时扫描 PATH后续提示词复用缓存。"""
command_paths = {"git": "/usr/bin/git"}
with patch(
"app.agent.prompt.shutil.which",
side_effect=lambda command: command_paths.get(command),
) as which_mock:
first_prompt = prompt_manager.get_agent_prompt()
second_prompt = prompt_manager.get_agent_prompt()
self.assertIn(" - git: /usr/bin/git", first_prompt)
self.assertIn(" - git: /usr/bin/git", second_prompt)
self.assertEqual(which_mock.call_count, len(COMMON_SHELL_COMMANDS))
def test_common_shell_commands_skip_linux_basics(self):
"""不影响任务策略的通用命令不进入启动探测列表,避免重复 which。"""
low_value_commands = {
"rsync",
"find",
"grep",
"sed",
"awk",
"tar",
"gzip",
"gunzip",
"base64",
"du",
"df",
"ps",
"top",
"ping",
"pip",
"pip3",
"uv",
"node",
"npm",
"yarn",
"pnpm",
"bun",
"sqlite3",
"psql",
"mysql",
"redis-cli",
"kubectl",
"helm",
"lsof",
"netstat",
"ss",
"traceroute",
"dig",
"nslookup",
"nc",
"telnet",
"crontab",
"systemctl",
"service",
"journalctl",
"launchctl",
"brew",
"apt",
"apk",
"yum",
"dnf",
}
self.assertFalse(low_value_commands & set(COMMON_SHELL_COMMANDS))
def test_common_shell_commands_keep_extra_install_runtime_tools(self):
"""需要额外安装且会影响执行方式的运行时工具应保留探测。"""
expected_commands = {"ssh", "scp", "sftp", "python", "python3"}
self.assertTrue(expected_commands <= set(COMMON_SHELL_COMMANDS))
def test_runtime_config_middleware_injects_persona_only(self):
middleware = RuntimeConfigMiddleware()
updated_request = middleware.modify_request(_FakeRequest())
combined_text = "\n".join(
block["text"] for block in updated_request.system_message.content_blocks
)
self.assertIn("<agent_persona>", combined_text)
self.assertIn("Active persona: `default`", combined_text)
self.assertIn("professional, concise, restrained", combined_text)
self.assertNotIn("System Tasks.yaml", combined_text)
def test_system_tasks_are_loaded_from_prompt_directory(self):
definition = prompt_manager.load_system_tasks_definition()
self.assertEqual(definition.version, 2)
self.assertTrue(definition.path.name.endswith("System Tasks.yaml"))
def test_render_system_task_message_uses_builtin_yaml_definition(self):
message = prompt_manager.render_system_task_message("heartbeat")
self.assertIn("[System Heartbeat]", message)
self.assertIn("List all jobs with status 'pending' or 'in_progress'.", message)
self.assertIn("Do NOT include greetings, explanations, or conversational text.", message)
self.assertIn("use the `send_message` tool", message)
self.assertIn("Your final response for heartbeat must be empty", message)
self.assertIn("If no jobs were executed, output nothing.", message)
def test_render_system_task_message_renders_template_context(self):
message = prompt_manager.render_system_task_message(
"transfer_failed_retry",
template_context={
"history_ids_csv": "7",
"history_count": 1,
"history_id": 7,
},
)
self.assertIn("Failed transfer history record IDs: 7", message)
self.assertIn("Total failed records: 1", message)
self.assertIn("history_id=7", message)
def test_render_batch_manual_transfer_redo_message(self):
message = prompt_manager.render_system_task_message(
"batch_manual_transfer_redo",
template_context={
"history_ids_csv": "7, 8",
"history_count": 2,
"records_context": "Record #7:\n- Source path: /downloads/a.mkv",
},
)
self.assertIn("[System Task - Batch Manual Transfer Re-Organize]", message)
self.assertIn("History IDs: 7, 8", message)
self.assertIn("Total records: 2", message)
self.assertIn("Record #7:", message)
def test_missing_system_task_template_context_raises_clear_error(self):
with self.assertRaises(PromptConfigError):
prompt_manager.render_system_task_message("transfer_failed_retry")
def test_non_verbose_prompt_requires_silence_until_all_tools_finish(self):
with patch.object(settings, "AI_AGENT_VERBOSE", False):
prompt = prompt_manager.get_agent_prompt()
self.assertIn(
"[Important Instruction] STRICTLY ENFORCED:",
prompt,
)
self.assertIn(
"DO NOT output any conversational text, explanations, progress updates, or acknowledgements before the first tool call or between tool calls",
prompt,
)
self.assertIn(
"Only then may you send one final user-facing reply",
prompt,
)
def test_voice_prompt_marks_voice_tool_as_terminal_reply(self):
"""语音回复提示词应说明语音工具会结束当前轮次。"""
with patch.object(settings, "LLM_SUPPORT_AUDIO_OUTPUT", True):
prompt = prompt_manager.get_agent_prompt()
self.assertIn("send_voice_message", prompt)
self.assertIn("terminal response tool", prompt)
self.assertIn("do not write a final text reply after it", prompt)
self.assertIn("text fallback and still completes the reply", prompt)
def test_core_prompt_describes_voice_input_metadata(self):
"""核心提示词应说明结构化消息中的语音输入元信息。"""
prompt = prompt_manager.get_agent_prompt()
self.assertIn("input.mode", prompt)
self.assertIn("voice", prompt)
self.assertIn("`message` contains its transcript", prompt)
def test_verbose_prompt_does_not_inject_silence_until_tools_finish_rule(self):
with patch.object(settings, "AI_AGENT_VERBOSE", True):
prompt = prompt_manager.get_agent_prompt()
self.assertNotIn(
"DO NOT output any conversational text, explanations, progress updates, or acknowledgements before the first tool call or between tool calls",
prompt,
)
def test_memory_onboarding_does_not_force_warm_intro(self):
self.assertIn("Do NOT interrupt the current task", MEMORY_ONBOARDING_PROMPT)
self.assertIn("Do NOT proactively greet warmly", MEMORY_ONBOARDING_PROMPT)
self.assertNotIn("greet the user warmly", MEMORY_ONBOARDING_PROMPT)

View File

@@ -0,0 +1,42 @@
from types import SimpleNamespace
from app.agent.prompt import prompt_manager
from app.api.endpoints.history import build_batch_manual_redo_prompt
def test_batch_manual_redo_prompt_requires_plain_text_result():
"""批量 AI 重新整理提示词应要求最终回复只输出纯文本描述。"""
history = SimpleNamespace(
id=7,
src_fileitem={"path": "/downloads/a.mkv"},
src="",
seasons="",
episodes="",
status=False,
title="示例",
type="电影",
category="电影",
year="2024",
src_storage="local",
dest="/media/a.mkv",
dest_storage="local",
mode="copy",
tmdbid=123,
doubanid=None,
errmsg="识别失败",
)
prompt = build_batch_manual_redo_prompt([history])
assert "Final response must be plain text only" in prompt
assert "Do NOT include any title/header, bullet list" in prompt
assert "Markdown formatting" in prompt
def test_batch_manual_redo_job_definition_contains_plain_text_rules():
"""批量 AI 重新整理任务定义应直接声明纯文本最终回复规则。"""
definition = prompt_manager.load_system_tasks_definition()
task_rules = definition.task_types["batch_manual_transfer_redo"].task_rules
assert any("plain text only" in rule for rule in task_rules)
assert any("Markdown formatting" in rule for rule in task_rules)

View File

@@ -0,0 +1,230 @@
from pathlib import Path
from typing import Iterator, Union
from unittest.mock import PropertyMock, patch
import pytest
from app import schemas
from app.modules.filemanager.storages.alipan import AliPan
from app.modules.filemanager.storages.rclone import Rclone
from app.modules.filemanager.storages.u115 import U115Pan
PAYLOAD = b"safe-download\n"
def _noop_progress(_percent: Union[int, float]) -> None:
"""忽略测试中的进度更新。"""
return None
class _FakeAliPanStream:
"""模拟阿里云盘下载流。"""
def __init__(self, payload: bytes) -> None:
self._payload = payload
def raise_for_status(self) -> None:
"""模拟响应状态检查。"""
return None
def iter_content(self, chunk_size: int) -> Iterator[bytes]:
"""返回下载内容分块。"""
yield self._payload
def __enter__(self) -> "_FakeAliPanStream":
"""进入上下文。"""
return self
def __exit__(self, *args: object) -> None:
"""退出上下文。"""
return None
class _FakeU115Stream:
"""模拟 115 下载流。"""
def __init__(self, payload: bytes) -> None:
self._payload = payload
def raise_for_status(self) -> None:
"""模拟响应状态检查。"""
return None
def iter_bytes(self, chunk_size: int) -> Iterator[bytes]:
"""返回下载内容分块。"""
yield self._payload
def close(self) -> None:
"""模拟关闭响应流。"""
return None
def __enter__(self) -> "_FakeU115Stream":
"""进入上下文。"""
return self
def __exit__(self, *args: object) -> None:
"""退出上下文。"""
return None
class _FakeU115Session:
"""模拟 115 HTTP 会话。"""
def __init__(self, payload: bytes) -> None:
self._payload = payload
def stream(self, method: str, url: str) -> _FakeU115Stream:
"""返回伪造的下载流。"""
return _FakeU115Stream(self._payload)
class _FakeRcloneProcess:
"""模拟 rclone 子进程。"""
stdout: list[str] = []
def wait(self) -> int:
"""返回成功退出码。"""
return 0
@pytest.mark.parametrize(
("name", "expected"),
[
("../proof.txt", "proof.txt"),
("..\\proof.txt", "proof.txt"),
("/tmp/proof.txt", "proof.txt"),
],
)
def test_build_download_path_strips_remote_directory_segments(
tmp_path: Path, name: str, expected: str
) -> None:
"""本地下载路径应剥离远端文件名中的目录片段。"""
storage = Rclone.__new__(Rclone)
fileitem = schemas.FileItem(path=f"/remote/{expected}", name=name)
local_path = storage._build_download_path(fileitem, tmp_path)
assert local_path == tmp_path / expected
assert local_path.resolve().relative_to(tmp_path.resolve()) == Path(expected)
@pytest.mark.parametrize("name", ["", ".", "..", "subdir/.."])
def test_build_download_path_rejects_unsafe_filename(
tmp_path: Path, name: str
) -> None:
"""本地下载路径应拒绝无法安全落盘的文件名。"""
storage = Rclone.__new__(Rclone)
fileitem = schemas.FileItem(path="/remote/proof.txt", name=name)
assert storage._build_download_path(fileitem, tmp_path) is None
def test_alipan_download_writes_sanitized_filename(tmp_path: Path) -> None:
"""阿里云盘下载应将路径穿越文件名写入目标目录内。"""
alipan = AliPan.__new__(AliPan)
alipan.chunk_size = 8192
fileitem = schemas.FileItem(
storage="alipan",
type="file",
path="/remote/proof.txt",
name="../proof.txt",
size=len(PAYLOAD),
fileid="file-id",
drive_id="drive-id",
)
with (
patch.object(
alipan,
"_request_api",
return_value={"url": "https://example.invalid/proof.txt"},
),
patch.object(AliPan, "access_token", new_callable=PropertyMock, return_value=None),
patch(
"app.modules.filemanager.storages.alipan.transfer_process",
return_value=_noop_progress,
),
patch(
"app.modules.filemanager.storages.alipan.global_vars.is_transfer_stopped",
return_value=False,
),
patch("app.modules.filemanager.storages.alipan.RequestUtils") as request_utils,
):
request_utils.return_value.get_stream.return_value = _FakeAliPanStream(PAYLOAD)
result = alipan.download(fileitem, path=tmp_path)
expected_path = tmp_path / "proof.txt"
assert result == expected_path
assert expected_path.read_bytes() == PAYLOAD
assert not (tmp_path.parent / "proof.txt").exists()
def test_u115_download_writes_sanitized_filename(tmp_path: Path) -> None:
"""115 下载应将路径穿越文件名写入目标目录内。"""
u115 = U115Pan.__new__(U115Pan)
u115.chunk_size = 8192
u115.session = _FakeU115Session(PAYLOAD)
detail = schemas.FileItem(size=len(PAYLOAD), pickcode="pick-code")
fileitem = schemas.FileItem(
storage="u115",
type="file",
path="/remote/proof.txt",
name="../proof.txt",
size=len(PAYLOAD),
)
with (
patch.object(u115, "get_item", return_value=detail),
patch.object(
u115,
"_request_api",
return_value={"file-id": {"url": {"url": "https://example.invalid/proof.txt"}}},
),
patch(
"app.modules.filemanager.storages.u115.transfer_process",
return_value=_noop_progress,
),
patch(
"app.modules.filemanager.storages.u115.global_vars.is_transfer_stopped",
return_value=False,
),
):
result = u115.download(fileitem, path=tmp_path)
expected_path = tmp_path / "proof.txt"
assert result == expected_path
assert expected_path.read_bytes() == PAYLOAD
assert not (tmp_path.parent / "proof.txt").exists()
def test_rclone_download_uses_sanitized_target_path(tmp_path: Path) -> None:
"""rclone 下载应把清洗后的本地路径传给 copyto。"""
storage = Rclone.__new__(Rclone)
fileitem = schemas.FileItem(
storage="rclone",
type="file",
path="/remote/proof.txt",
name="../proof.txt",
size=len(PAYLOAD),
)
captured_cmd: dict[str, list[str]] = {}
def fake_popen(cmd: list[str], *args: object, **kwargs: object) -> _FakeRcloneProcess:
captured_cmd["cmd"] = cmd
return _FakeRcloneProcess()
with (
patch(
"app.modules.filemanager.storages.rclone.transfer_process",
return_value=_noop_progress,
),
patch("app.modules.filemanager.storages.rclone.subprocess.Popen", side_effect=fake_popen),
):
result = storage.download(fileitem, path=tmp_path)
expected_path = tmp_path / "proof.txt"
assert result == expected_path
assert captured_cmd["cmd"][-1] == str(expected_path)
assert expected_path.resolve().relative_to(tmp_path.resolve()) == Path("proof.txt")

View File

@@ -456,6 +456,69 @@ class SubscribeChainTest(TestCase):
self.assertEqual(SubscribeChain.get_best_version_current_priority(subscribe), 100)
self.assertTrue(SubscribeChain.is_best_version_complete(subscribe))
def test_get_subscribe_no_exists_expands_whole_missing_when_custom_start_skips_existing_range(self):
"""自定义开始集跳过季初集数时,缺失整季需要转成显式目标集。"""
no_exists = {
"media-key": {
1: SimpleNamespace(season=1, episodes=[], total_episode=48, start_episode=1)
}
}
exist_flag, result = SubscribeChain._SubscribeChain__get_subscribe_no_exits(
subscribe_name="主角 S01",
no_exists=no_exists,
mediakey="media-key",
begin_season=1,
total_episode=48,
start_episode=44,
)
self.assertFalse(exist_flag)
self.assertEqual(result["media-key"][1].episodes, [44, 45, 46, 47, 48])
self.assertEqual(result["media-key"][1].start_episode, 44)
self.assertEqual(result["media-key"][1].total_episode, 48)
def test_get_subscribe_no_exists_keeps_whole_missing_when_custom_start_matches_original_start(self):
"""自定义开始集没有缩小范围时,仍保留空集列表表示整季缺失。"""
no_exists = {
"media-key": {
1: SimpleNamespace(season=1, episodes=[], total_episode=48, start_episode=1)
}
}
exist_flag, result = SubscribeChain._SubscribeChain__get_subscribe_no_exits(
subscribe_name="主角 S01",
no_exists=no_exists,
mediakey="media-key",
begin_season=1,
total_episode=48,
start_episode=1,
)
self.assertFalse(exist_flag)
self.assertEqual(result["media-key"][1].episodes, [])
self.assertEqual(result["media-key"][1].start_episode, 1)
self.assertEqual(result["media-key"][1].total_episode, 48)
def test_best_version_full_pack_first_keeps_whole_missing_for_custom_start_episode(self):
"""分集洗版优先全集时,空集列表仍表示下载链按整季资源处理。"""
subscribe = self._build_subscribe(
best_version=1,
best_version_full=0,
start_episode=44,
total_episode=48,
episode_priority={str(episode): 80 for episode in range(44, 49)},
)
result = SubscribeChain._SubscribeChain__build_full_pack_first_no_exists(
subscribe=subscribe,
mediakey="media-key",
)
self.assertEqual(result["media-key"][1].episodes, [])
self.assertEqual(result["media-key"][1].start_episode, 44)
self.assertEqual(result["media-key"][1].total_episode, 48)
def test_is_episode_range_covered_matches_pending_episodes(self):
subscribe = self._build_subscribe(
total_episode=12,

View File

@@ -1,2 +1,2 @@
APP_VERSION = 'v2.13.5'
FRONTEND_VERSION = 'v2.13.5'
APP_VERSION = 'v2.13.6'
FRONTEND_VERSION = 'v2.13.6'