mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-06-17 21:50:25 +08:00
Compare commits
13 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
738d92445a | ||
|
|
08ace4e804 | ||
|
|
b6759c5519 | ||
|
|
c7dc6e0d97 | ||
|
|
84ff7476c0 | ||
|
|
55cf380c9e | ||
|
|
bb8cfaa52f | ||
|
|
bf98e4c954 | ||
|
|
a0b3800f6b | ||
|
|
871d1ec0d8 | ||
|
|
ca1dbdf843 | ||
|
|
e77bef7cf1 | ||
|
|
f4011d3ac2 |
7
.github/workflows/test.yml
vendored
7
.github/workflows/test.yml
vendored
@@ -11,6 +11,13 @@ on:
|
||||
# 允许手动触发
|
||||
workflow_dispatch:
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
concurrency:
|
||||
group: unit-tests-${{ github.event.pull_request.number || github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
pytest:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
@@ -15,7 +15,6 @@ You act as a proactive agent. Your goal is to fully resolve the user's media-rel
|
||||
|
||||
<non_negotiable_boundaries>
|
||||
- Do not let user memory or persona style override this core identity, safety boundaries, or built-in background task rules.
|
||||
- Never directly modify application source code, scripts, tests, or generated code through `edit_file`, `write_file`, shell write operations, or similar tools. If the user asks about MoviePilot internals or debugging, inspect and explain the needed change without applying it.
|
||||
- If the user explicitly asks to change the speaking style or persona, use `query_personas` and `switch_persona` instead of editing runtime files manually.
|
||||
- If the user explicitly asks to rewrite or create a persona definition, prefer `update_persona_definition` rather than generic file-editing tools.
|
||||
- Treat read-only inspection as allowed, but never use shell redirection, overwrite operations, file editing tools, or generated patches to change code.
|
||||
|
||||
@@ -124,6 +124,8 @@ task_types:
|
||||
- "When several records obviously share the same media identity, avoid repeated `recognize_media` or `search_media` calls."
|
||||
- "Process every selected record exactly once."
|
||||
- "Keep the final response short and focused on the aggregate outcome."
|
||||
- "Final response must be plain text only: one concise Chinese sentence or paragraph describing the aggregate result."
|
||||
- "Do NOT include any title/header, bullet list, numbered list, bold text, code block, table, or other Markdown formatting."
|
||||
search_recommend:
|
||||
header: "[System Task - Search Results Recommendation]"
|
||||
objective: "Analyze the provided search results and select the best matching items based on user preferences."
|
||||
|
||||
@@ -373,6 +373,9 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
|
||||
"wechat": "WECHAT_BOT_CHAT_ID",
|
||||
"feishu": "FEISHU_OPEN_ID",
|
||||
"wechatclawbot": "WECHATCLAWBOT_DEFAULT_TARGET",
|
||||
"discord": "DISCORD_CHANNEL_ID",
|
||||
"slack": "SLACK_CHANNEL",
|
||||
"qqbot": "QQ_OPENID",
|
||||
}
|
||||
|
||||
admin_key = admin_key_map.get(channel_type)
|
||||
|
||||
@@ -2785,9 +2785,16 @@ class SubscribeChain(ChainBase):
|
||||
# 更新剧集列表、开始集数、总集数
|
||||
if not episode_list:
|
||||
# 整季缺失
|
||||
episodes = []
|
||||
start_episode = start_episode or start
|
||||
total_episode = total_episode or total
|
||||
original_start = start if start is not None else 1
|
||||
# 空集列表会被下载链解释为整季下载;当订阅开始集裁掉季初范围时,需要转成显式集数。
|
||||
if start_episode and total_episode and start_episode > original_start:
|
||||
episodes = list(range(start_episode, total_episode + 1))
|
||||
if not episodes:
|
||||
return True, {}
|
||||
else:
|
||||
episodes = []
|
||||
else:
|
||||
# 部分缺失
|
||||
if not start_episode \
|
||||
|
||||
@@ -9,10 +9,10 @@ import sys
|
||||
import threading
|
||||
from asyncio import AbstractEventLoop
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple, Type
|
||||
from typing import Any, Dict, List, Optional, Tuple, Type, Union, get_origin, get_args
|
||||
from urllib.parse import quote, urlencode, urlparse
|
||||
|
||||
from dotenv import set_key
|
||||
from dotenv import set_key, unset_key
|
||||
from pydantic import BaseModel, Field, ConfigDict, model_validator
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
@@ -690,6 +690,18 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
|
||||
if isinstance(value, str):
|
||||
value = value.strip()
|
||||
|
||||
# 处理 Optional 类型:当值为空字符串且类型允许 None 时,转为 None
|
||||
# 兼容 typing.Union (Python 3.9) 与 types.UnionType (Python 3.10+ PEP 604)
|
||||
origin = get_origin(expected_type)
|
||||
is_union = origin is Union or getattr(origin, "__name__", None) == "UnionType"
|
||||
if (
|
||||
is_union
|
||||
and type(None) in get_args(expected_type)
|
||||
and isinstance(value, str)
|
||||
and not value
|
||||
):
|
||||
return default, str(default) != str(original_value)
|
||||
|
||||
try:
|
||||
if expected_type is bool:
|
||||
if isinstance(value, bool):
|
||||
@@ -812,13 +824,19 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
|
||||
logger.warning(message)
|
||||
return False, message
|
||||
else:
|
||||
# 当值为 None 时,从 env 文件中删除该键,恢复为默认值
|
||||
if converted_value is None:
|
||||
unset_key(
|
||||
dotenv_path=SystemUtils.get_env_path(),
|
||||
key_to_unset=field_name,
|
||||
)
|
||||
logger.info(f"配置项 '{field_name}' 已清空,从 'app.env' 中移除")
|
||||
return True, message
|
||||
# 如果是列表、字典或集合类型,将其转换为JSON字符串
|
||||
if isinstance(converted_value, (list, dict, set)):
|
||||
value_to_write = json.dumps(converted_value)
|
||||
else:
|
||||
value_to_write = (
|
||||
str(converted_value) if converted_value is not None else ""
|
||||
)
|
||||
value_to_write = str(converted_value)
|
||||
|
||||
set_key(
|
||||
dotenv_path=SystemUtils.get_env_path(),
|
||||
@@ -967,7 +985,7 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
|
||||
|
||||
@property
|
||||
def PROXY(self):
|
||||
if self.PROXY_HOST:
|
||||
if self.PROXY_HOST and self.PROXY_HOST.strip():
|
||||
return {
|
||||
"http": self.PROXY_HOST,
|
||||
"https": self.PROXY_HOST,
|
||||
@@ -1009,7 +1027,7 @@ class Settings(BaseSettings, ConfigModel, LogConfigModel):
|
||||
|
||||
@property
|
||||
def PROXY_SERVER(self):
|
||||
if self.PROXY_HOST:
|
||||
if self.PROXY_HOST and self.PROXY_HOST.strip():
|
||||
try:
|
||||
parsed = urlparse(self.PROXY_HOST)
|
||||
if not parsed.scheme:
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from pathlib import Path
|
||||
from pathlib import Path, PurePosixPath
|
||||
from typing import Optional, List, Dict, Tuple, Callable, Union
|
||||
|
||||
from tqdm import tqdm
|
||||
@@ -105,6 +105,38 @@ class StorageBase(metaclass=ABCMeta):
|
||||
self.storagehelper.reset_storage(self.schema.value)
|
||||
self.init_storage()
|
||||
|
||||
@staticmethod
|
||||
def _safe_download_name(name: Optional[str]) -> Optional[str]:
|
||||
"""
|
||||
提取可安全落盘的文件名。
|
||||
"""
|
||||
if not name:
|
||||
return None
|
||||
|
||||
safe_name = PurePosixPath(str(name).replace("\\", "/")).name
|
||||
if safe_name in ("", ".", ".."):
|
||||
return None
|
||||
return safe_name
|
||||
|
||||
def _build_download_path(
|
||||
self, fileitem: schemas.FileItem, path: Path
|
||||
) -> Optional[Path]:
|
||||
"""
|
||||
构造本地下载路径,避免远端文件名携带目录片段时越过目标目录。
|
||||
"""
|
||||
safe_name = self._safe_download_name(fileitem.name)
|
||||
if not safe_name:
|
||||
logger.error(f"【存储】下载文件名无效:{fileitem.name}")
|
||||
return None
|
||||
|
||||
local_path = path / safe_name
|
||||
try:
|
||||
local_path.resolve().relative_to(path.resolve())
|
||||
except ValueError:
|
||||
logger.error(f"【存储】下载路径越界:{fileitem.name} -> {local_path}")
|
||||
return None
|
||||
return local_path
|
||||
|
||||
@abstractmethod
|
||||
def check(self) -> bool:
|
||||
"""
|
||||
|
||||
@@ -741,7 +741,9 @@ class AliPan(StorageBase, metaclass=WeakSingleton):
|
||||
logger.error(f"【阿里云盘】下载链接为空: {fileitem.name}")
|
||||
return None
|
||||
|
||||
local_path = (path or settings.TEMP_PATH) / fileitem.name
|
||||
local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH)
|
||||
if not local_path:
|
||||
return None
|
||||
|
||||
# 获取文件大小
|
||||
file_size = fileitem.size
|
||||
|
||||
@@ -340,7 +340,9 @@ class Rclone(StorageBase):
|
||||
"""
|
||||
带实时进度显示的下载
|
||||
"""
|
||||
local_path = (path or settings.TEMP_PATH) / fileitem.name
|
||||
local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH)
|
||||
if not local_path:
|
||||
return None
|
||||
|
||||
# 初始化进度条
|
||||
logger.info(f"【rclone】开始下载: {fileitem.name} -> {local_path}")
|
||||
|
||||
@@ -511,7 +511,9 @@ class SMB(StorageBase, metaclass=WeakSingleton):
|
||||
"""
|
||||
带实时进度显示的下载
|
||||
"""
|
||||
local_path = (path or settings.TEMP_PATH) / fileitem.name
|
||||
local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH)
|
||||
if not local_path:
|
||||
return None
|
||||
smb_path = self._normalize_path(fileitem.path)
|
||||
try:
|
||||
self._check_connection()
|
||||
|
||||
@@ -830,7 +830,9 @@ class U115Pan(StorageBase, metaclass=WeakSingleton):
|
||||
logger.error(f"【115】下载链接为空: {fileitem.name}")
|
||||
return None
|
||||
|
||||
local_path = (path or settings.TEMP_PATH) / fileitem.name
|
||||
local_path = self._build_download_path(fileitem, path or settings.TEMP_PATH)
|
||||
if not local_path:
|
||||
return None
|
||||
|
||||
# 获取文件大小
|
||||
file_size = detail.size
|
||||
|
||||
@@ -100,6 +100,7 @@ def send_proactive_c2c_message(
|
||||
openid: str,
|
||||
content: str,
|
||||
use_markdown: bool = False,
|
||||
keyboard: Optional[dict] = None,
|
||||
) -> dict:
|
||||
"""
|
||||
主动发送 C2C 单聊消息(不需要 msg_id)
|
||||
@@ -108,11 +109,14 @@ def send_proactive_c2c_message(
|
||||
:param openid: 用户 openid
|
||||
:param content: 消息内容
|
||||
:param use_markdown: 是否使用 Markdown 格式(需机器人开通 Markdown 能力)
|
||||
:param keyboard: 键盘按钮配置
|
||||
"""
|
||||
if not content or not content.strip():
|
||||
raise ValueError("主动消息内容不能为空")
|
||||
content = content.strip()
|
||||
body = {"markdown": {"content": content}, "msg_type": 2} if use_markdown else {"content": content, "msg_type": 0}
|
||||
if keyboard:
|
||||
body["keyboard"] = {"content": keyboard}
|
||||
return _api_request(
|
||||
access_token, "POST", f"/v2/users/{openid}/messages", body
|
||||
)
|
||||
@@ -123,6 +127,7 @@ def send_proactive_group_message(
|
||||
group_openid: str,
|
||||
content: str,
|
||||
use_markdown: bool = False,
|
||||
keyboard: Optional[dict] = None,
|
||||
) -> dict:
|
||||
"""
|
||||
主动发送群聊消息(不需要 msg_id)
|
||||
@@ -131,11 +136,14 @@ def send_proactive_group_message(
|
||||
:param group_openid: 群聊 openid
|
||||
:param content: 消息内容
|
||||
:param use_markdown: 是否使用 Markdown 格式(需机器人开通 Markdown 能力)
|
||||
:param keyboard: 键盘按钮配置
|
||||
"""
|
||||
if not content or not content.strip():
|
||||
raise ValueError("主动消息内容不能为空")
|
||||
content = content.strip()
|
||||
body = {"markdown": {"content": content}, "msg_type": 2} if use_markdown else {"content": content, "msg_type": 0}
|
||||
if keyboard:
|
||||
body["keyboard"] = {"content": keyboard}
|
||||
return _api_request(
|
||||
access_token, "POST", f"/v2/groups/{group_openid}/messages", body
|
||||
)
|
||||
@@ -146,11 +154,14 @@ def send_c2c_message(
|
||||
openid: str,
|
||||
content: str,
|
||||
msg_id: Optional[str] = None,
|
||||
keyboard: Optional[dict] = None,
|
||||
) -> dict:
|
||||
"""被动回复 C2C 单聊消息(1 小时内最多 4 次)"""
|
||||
body = {"content": content, "msg_type": 0, "msg_seq": 1}
|
||||
if msg_id:
|
||||
body["msg_id"] = msg_id
|
||||
if keyboard:
|
||||
body["keyboard"] = {"content": keyboard}
|
||||
return _api_request(
|
||||
access_token, "POST", f"/v2/users/{openid}/messages", body
|
||||
)
|
||||
@@ -161,11 +172,14 @@ def send_group_message(
|
||||
group_openid: str,
|
||||
content: str,
|
||||
msg_id: Optional[str] = None,
|
||||
keyboard: Optional[dict] = None,
|
||||
) -> dict:
|
||||
"""被动回复群聊消息(1 小时内最多 4 次)"""
|
||||
body = {"content": content, "msg_type": 0, "msg_seq": 1}
|
||||
if msg_id:
|
||||
body["msg_id"] = msg_id
|
||||
if keyboard:
|
||||
body["keyboard"] = {"content": keyboard}
|
||||
return _api_request(
|
||||
access_token, "POST", f"/v2/groups/{group_openid}/messages", body
|
||||
)
|
||||
@@ -188,6 +202,7 @@ def send_message(
|
||||
content: str,
|
||||
msg_type: Literal["c2c", "group"] = "c2c",
|
||||
msg_id: Optional[str] = None,
|
||||
keyboard: Optional[dict] = None,
|
||||
) -> dict:
|
||||
"""
|
||||
统一发送接口
|
||||
@@ -196,11 +211,12 @@ def send_message(
|
||||
:param content: 消息内容
|
||||
:param msg_type: c2c 单聊 / group 群聊
|
||||
:param msg_id: 可选,被动回复时传入原消息 id
|
||||
:param keyboard: 可选,键盘按钮配置
|
||||
"""
|
||||
if msg_id:
|
||||
if msg_type == "c2c":
|
||||
return send_c2c_message(access_token, target, content, msg_id)
|
||||
return send_group_message(access_token, target, content, msg_id)
|
||||
return send_c2c_message(access_token, target, content, msg_id, keyboard)
|
||||
return send_group_message(access_token, target, content, msg_id, keyboard)
|
||||
if msg_type == "c2c":
|
||||
return send_proactive_c2c_message(access_token, target, content)
|
||||
return send_proactive_group_message(access_token, target, content)
|
||||
return send_proactive_c2c_message(access_token, target, content, keyboard=keyboard)
|
||||
return send_proactive_group_message(access_token, target, content, keyboard=keyboard)
|
||||
|
||||
@@ -4,6 +4,7 @@ QQ Bot Gateway WebSocket 客户端
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from typing import Callable, List, Optional
|
||||
@@ -108,6 +109,9 @@ def run_gateway(
|
||||
author = d.get("author", {})
|
||||
user_openid = author.get("user_openid", "")
|
||||
content = d.get("content", "").strip()
|
||||
match = re.search(r'(agent_interaction:choice:[\w\-]+:\d+|agent_choice:[\w\-]+:\d+)', content)
|
||||
if match:
|
||||
content = f"CALLBACK:{match.group(1)}"
|
||||
msg_id = d.get("id", "")
|
||||
if content:
|
||||
on_message_fn({
|
||||
@@ -122,6 +126,9 @@ def run_gateway(
|
||||
member_openid = author.get("member_openid", "")
|
||||
group_openid = d.get("group_openid", "")
|
||||
content = d.get("content", "").strip()
|
||||
match = re.search(r'(agent_interaction:choice:[\w\-]+:\d+|agent_choice:[\w\-]+:\d+)', content)
|
||||
if match:
|
||||
content = f"CALLBACK:{match.group(1)}"
|
||||
msg_id = d.get("id", "")
|
||||
if content:
|
||||
on_message_fn({
|
||||
|
||||
@@ -347,13 +347,43 @@ class QQBot:
|
||||
logger.warn("QQ Bot: 消息内容为空")
|
||||
return False
|
||||
|
||||
# 处理按钮
|
||||
buttons = kwargs.get("buttons")
|
||||
keyboard = None
|
||||
if buttons:
|
||||
rows = []
|
||||
btn_id = 1
|
||||
for row in buttons:
|
||||
btns = []
|
||||
for btn in row:
|
||||
action_type = 0 if btn.get("url") else 2
|
||||
btns.append({
|
||||
"id": str(btn_id),
|
||||
"render_data": {
|
||||
"label": btn.get("text", "按钮")[:30],
|
||||
"visited_label": btn.get("text", "按钮")[:30],
|
||||
"style": 1
|
||||
},
|
||||
"action": {
|
||||
"type": action_type,
|
||||
"data": btn.get("url") if action_type == 0 else btn.get("callback_data", ""),
|
||||
"permission": {"type": 2}
|
||||
}
|
||||
})
|
||||
btn_id += 1
|
||||
if btns:
|
||||
rows.append({"buttons": btns})
|
||||
if rows:
|
||||
keyboard = {"rows": rows}
|
||||
use_markdown = True
|
||||
|
||||
success_count = 0
|
||||
try:
|
||||
token = get_access_token(self._app_id, self._app_secret)
|
||||
for tgt, tgt_is_group in targets_to_send:
|
||||
send_fn = send_proactive_group_message if tgt_is_group else send_proactive_c2c_message
|
||||
try:
|
||||
send_fn(token, tgt, content, use_markdown=use_markdown)
|
||||
send_fn(token, tgt, content, use_markdown=use_markdown, keyboard=keyboard)
|
||||
success_count += 1
|
||||
logger.debug(f"QQ Bot: 消息已发送到 {'群' if tgt_is_group else '用户'} {tgt}")
|
||||
except Exception as e:
|
||||
@@ -371,7 +401,7 @@ class QQBot:
|
||||
plain_parts.append(link)
|
||||
plain_content = "\n".join(plain_parts).strip()
|
||||
if plain_content:
|
||||
send_fn(token, tgt, plain_content, use_markdown=False)
|
||||
send_fn(token, tgt, plain_content, use_markdown=False, keyboard=None)
|
||||
success_count += 1
|
||||
logger.debug(f"QQ Bot: Markdown 不可用,已回退纯文本发送至 {tgt}")
|
||||
else:
|
||||
|
||||
@@ -450,7 +450,12 @@ class ChannelCapabilityManager:
|
||||
ChannelCapability.RICH_TEXT,
|
||||
ChannelCapability.IMAGES,
|
||||
ChannelCapability.LINKS,
|
||||
ChannelCapability.INLINE_BUTTONS,
|
||||
ChannelCapability.CALLBACK_QUERIES,
|
||||
},
|
||||
max_buttons_per_row=5,
|
||||
max_button_rows=5,
|
||||
max_button_text_length=30,
|
||||
fallback_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
@@ -92,6 +92,9 @@ def _get_shared_async_transport(
|
||||
会话级状态由调用方在外层 AsyncClient(transport=...) 实例化时单独配置,
|
||||
每次调用用完即销毁,因此天然无 jar 累积串扰。
|
||||
"""
|
||||
# 规范化代理:拒绝空字符串等非法值,防止 httpx 抛出 Unknown scheme for proxy URL
|
||||
if proxy is not None and (not proxy or not proxy.strip()):
|
||||
proxy = None
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
@@ -899,12 +902,17 @@ class AsyncRequestUtils:
|
||||
|
||||
# 如果已经是字符串格式,直接返回
|
||||
if isinstance(proxies, str):
|
||||
return proxies
|
||||
return proxies.strip() or None
|
||||
|
||||
# 如果是字典格式,提取http或https代理
|
||||
if isinstance(proxies, dict):
|
||||
# 优先使用https代理,如果没有则使用http代理
|
||||
proxy_url = proxies.get("https") or proxies.get("http")
|
||||
# 先各自 strip,避免空白字符串阻断裂合取或回退到 http 代理
|
||||
https_proxy = proxies.get("https")
|
||||
http_proxy = proxies.get("http")
|
||||
https_proxy = https_proxy.strip() if isinstance(https_proxy, str) else None
|
||||
http_proxy = http_proxy.strip() if isinstance(http_proxy, str) else None
|
||||
proxy_url = https_proxy or http_proxy
|
||||
if proxy_url:
|
||||
return proxy_url
|
||||
|
||||
|
||||
@@ -41,7 +41,6 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
iproute2 \
|
||||
netcat-openbsd \
|
||||
lsof \
|
||||
ffmpeg \
|
||||
nano \
|
||||
libjemalloc2 \
|
||||
&& dpkg-reconfigure --frontend noninteractive tzdata \
|
||||
@@ -114,6 +113,10 @@ FROM prepare_package AS final
|
||||
|
||||
ENV LD_PRELOAD="/usr/local/lib/libjemalloc.so"
|
||||
|
||||
# 引入支持 amr 编码的静态 ffmpeg
|
||||
COPY --from=mwader/static-ffmpeg:8.1.1 /ffmpeg /usr/local/bin/
|
||||
COPY --from=mwader/static-ffmpeg:8.1.1 /ffprobe /usr/local/bin/
|
||||
|
||||
# python 环境
|
||||
COPY --from=prepare_venv --chmod=777 ${VENV_PATH} ${VENV_PATH}
|
||||
COPY --from=prepare_venv /usr/local/bin/uv /usr/local/bin/uv
|
||||
|
||||
@@ -1,291 +0,0 @@
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
|
||||
from app.agent.middleware.memory import MEMORY_ONBOARDING_PROMPT
|
||||
from app.agent.middleware.runtime_config import RuntimeConfigMiddleware
|
||||
from app.agent.prompt import COMMON_SHELL_COMMANDS, PromptConfigError, prompt_manager
|
||||
from app.core.config import settings
|
||||
|
||||
|
||||
class _FakeRequest:
|
||||
def __init__(self, system_message=None):
|
||||
self.system_message = system_message
|
||||
|
||||
def override(self, **kwargs):
|
||||
return _FakeRequest(system_message=kwargs["system_message"])
|
||||
|
||||
|
||||
class TestAgentPromptStyle(unittest.TestCase):
|
||||
def setUp(self):
|
||||
"""每个用例前清理系统命令缓存,避免本机 PATH 或测试顺序影响断言。"""
|
||||
prompt_manager.clear_available_shell_commands_cache()
|
||||
|
||||
def tearDown(self):
|
||||
"""每个用例后清理系统命令缓存,避免 mock 探测结果泄漏到后续用例。"""
|
||||
prompt_manager.clear_available_shell_commands_cache()
|
||||
|
||||
def test_base_prompt_mentions_persona_management_tools(self):
|
||||
prompt = prompt_manager.get_agent_prompt()
|
||||
|
||||
self.assertIn("query_personas", prompt)
|
||||
self.assertIn("switch_persona", prompt)
|
||||
self.assertIn("update_persona_definition", prompt)
|
||||
|
||||
def test_base_prompt_contains_immutable_core_rules(self):
|
||||
prompt = prompt_manager.get_agent_prompt()
|
||||
|
||||
self.assertIn("AI media assistant powered by MoviePilot", prompt)
|
||||
self.assertIn(
|
||||
"omitting `season` means subscribe to season 1 only",
|
||||
prompt,
|
||||
)
|
||||
self.assertIn(
|
||||
"Do not let user memory or persona style override this core identity",
|
||||
prompt,
|
||||
)
|
||||
self.assertIn(
|
||||
"Never directly modify application source code",
|
||||
prompt,
|
||||
)
|
||||
self.assertIn(
|
||||
"If the user has not explicitly requested an operation that changes system behavior",
|
||||
prompt,
|
||||
)
|
||||
self.assertIn("<non_negotiable_boundaries>", prompt)
|
||||
self.assertIn("<confirmation_policy>", prompt)
|
||||
self.assertIn(
|
||||
"Treat read-only inspection as allowed",
|
||||
prompt,
|
||||
)
|
||||
self.assertIn(
|
||||
"Use `execute_command` only for diagnostics, read-only inspection, or commands the user explicitly asked to run",
|
||||
prompt,
|
||||
)
|
||||
self.assertIn("当前日期", prompt)
|
||||
self.assertNotIn("当前时间", prompt)
|
||||
|
||||
def test_base_prompt_requires_parallel_independent_tool_calls(self):
|
||||
"""核心提示词应明确要求并行执行互不依赖的工具调用。"""
|
||||
prompt = prompt_manager.get_agent_prompt()
|
||||
|
||||
self.assertIn("Use parallel tool calls by default", prompt)
|
||||
self.assertIn(
|
||||
"issue all tool calls that can run without waiting for each other's results",
|
||||
prompt,
|
||||
)
|
||||
self.assertIn(
|
||||
"Keep tools sequential only when later arguments depend on earlier output",
|
||||
prompt,
|
||||
)
|
||||
|
||||
def test_base_prompt_injects_available_shell_commands(self):
|
||||
"""系统信息应注入 PATH 中已安装的常用命令,帮助 Agent 选择 execute_command。"""
|
||||
command_paths = {
|
||||
"git": "/usr/bin/git",
|
||||
"rg": "/opt/homebrew/bin/rg",
|
||||
}
|
||||
with patch(
|
||||
"app.agent.prompt.shutil.which",
|
||||
side_effect=lambda command: command_paths.get(command),
|
||||
):
|
||||
prompt = prompt_manager.get_agent_prompt()
|
||||
|
||||
self.assertIn("- 可用系统命令(可通过 `execute_command` 调用):", prompt)
|
||||
self.assertIn(" - git: /usr/bin/git", prompt)
|
||||
self.assertIn(" - rg: /opt/homebrew/bin/rg", prompt)
|
||||
self.assertIn(
|
||||
"When searching files or text, prefer `rg` / `rg --files`",
|
||||
prompt,
|
||||
)
|
||||
self.assertNotIn(" - ssh:", prompt)
|
||||
|
||||
def test_base_prompt_omits_shell_command_section_when_none_available(self):
|
||||
"""PATH 中没有命中白名单命令时,不注入空的系统命令段落。"""
|
||||
with patch("app.agent.prompt.shutil.which", return_value=None):
|
||||
prompt = prompt_manager.get_agent_prompt()
|
||||
|
||||
self.assertNotIn("可用系统命令", prompt)
|
||||
|
||||
def test_available_shell_commands_are_cached_after_first_scan(self):
|
||||
"""常用命令探测应只在首次加载时扫描 PATH,后续提示词复用缓存。"""
|
||||
command_paths = {"git": "/usr/bin/git"}
|
||||
with patch(
|
||||
"app.agent.prompt.shutil.which",
|
||||
side_effect=lambda command: command_paths.get(command),
|
||||
) as which_mock:
|
||||
first_prompt = prompt_manager.get_agent_prompt()
|
||||
second_prompt = prompt_manager.get_agent_prompt()
|
||||
|
||||
self.assertIn(" - git: /usr/bin/git", first_prompt)
|
||||
self.assertIn(" - git: /usr/bin/git", second_prompt)
|
||||
self.assertEqual(which_mock.call_count, len(COMMON_SHELL_COMMANDS))
|
||||
|
||||
def test_common_shell_commands_skip_linux_basics(self):
|
||||
"""不影响任务策略的通用命令不进入启动探测列表,避免重复 which。"""
|
||||
low_value_commands = {
|
||||
"rsync",
|
||||
"find",
|
||||
"grep",
|
||||
"sed",
|
||||
"awk",
|
||||
"tar",
|
||||
"gzip",
|
||||
"gunzip",
|
||||
"base64",
|
||||
"du",
|
||||
"df",
|
||||
"ps",
|
||||
"top",
|
||||
"ping",
|
||||
"pip",
|
||||
"pip3",
|
||||
"uv",
|
||||
"node",
|
||||
"npm",
|
||||
"yarn",
|
||||
"pnpm",
|
||||
"bun",
|
||||
"sqlite3",
|
||||
"psql",
|
||||
"mysql",
|
||||
"redis-cli",
|
||||
"kubectl",
|
||||
"helm",
|
||||
"lsof",
|
||||
"netstat",
|
||||
"ss",
|
||||
"traceroute",
|
||||
"dig",
|
||||
"nslookup",
|
||||
"nc",
|
||||
"telnet",
|
||||
"crontab",
|
||||
"systemctl",
|
||||
"service",
|
||||
"journalctl",
|
||||
"launchctl",
|
||||
"brew",
|
||||
"apt",
|
||||
"apk",
|
||||
"yum",
|
||||
"dnf",
|
||||
}
|
||||
|
||||
self.assertFalse(low_value_commands & set(COMMON_SHELL_COMMANDS))
|
||||
|
||||
def test_common_shell_commands_keep_extra_install_runtime_tools(self):
|
||||
"""需要额外安装且会影响执行方式的运行时工具应保留探测。"""
|
||||
expected_commands = {"ssh", "scp", "sftp", "python", "python3"}
|
||||
|
||||
self.assertTrue(expected_commands <= set(COMMON_SHELL_COMMANDS))
|
||||
|
||||
def test_runtime_config_middleware_injects_persona_only(self):
|
||||
middleware = RuntimeConfigMiddleware()
|
||||
updated_request = middleware.modify_request(_FakeRequest())
|
||||
|
||||
combined_text = "\n".join(
|
||||
block["text"] for block in updated_request.system_message.content_blocks
|
||||
)
|
||||
|
||||
self.assertIn("<agent_persona>", combined_text)
|
||||
self.assertIn("Active persona: `default`", combined_text)
|
||||
self.assertIn("professional, concise, restrained", combined_text)
|
||||
self.assertNotIn("System Tasks.yaml", combined_text)
|
||||
|
||||
def test_system_tasks_are_loaded_from_prompt_directory(self):
|
||||
definition = prompt_manager.load_system_tasks_definition()
|
||||
|
||||
self.assertEqual(definition.version, 2)
|
||||
self.assertTrue(definition.path.name.endswith("System Tasks.yaml"))
|
||||
|
||||
def test_render_system_task_message_uses_builtin_yaml_definition(self):
|
||||
message = prompt_manager.render_system_task_message("heartbeat")
|
||||
|
||||
self.assertIn("[System Heartbeat]", message)
|
||||
self.assertIn("List all jobs with status 'pending' or 'in_progress'.", message)
|
||||
self.assertIn("Do NOT include greetings, explanations, or conversational text.", message)
|
||||
self.assertIn("use the `send_message` tool", message)
|
||||
self.assertIn("Your final response for heartbeat must be empty", message)
|
||||
self.assertIn("If no jobs were executed, output nothing.", message)
|
||||
|
||||
def test_render_system_task_message_renders_template_context(self):
|
||||
message = prompt_manager.render_system_task_message(
|
||||
"transfer_failed_retry",
|
||||
template_context={
|
||||
"history_ids_csv": "7",
|
||||
"history_count": 1,
|
||||
"history_id": 7,
|
||||
},
|
||||
)
|
||||
|
||||
self.assertIn("Failed transfer history record IDs: 7", message)
|
||||
self.assertIn("Total failed records: 1", message)
|
||||
self.assertIn("history_id=7", message)
|
||||
|
||||
def test_render_batch_manual_transfer_redo_message(self):
|
||||
message = prompt_manager.render_system_task_message(
|
||||
"batch_manual_transfer_redo",
|
||||
template_context={
|
||||
"history_ids_csv": "7, 8",
|
||||
"history_count": 2,
|
||||
"records_context": "Record #7:\n- Source path: /downloads/a.mkv",
|
||||
},
|
||||
)
|
||||
|
||||
self.assertIn("[System Task - Batch Manual Transfer Re-Organize]", message)
|
||||
self.assertIn("History IDs: 7, 8", message)
|
||||
self.assertIn("Total records: 2", message)
|
||||
self.assertIn("Record #7:", message)
|
||||
|
||||
def test_missing_system_task_template_context_raises_clear_error(self):
|
||||
with self.assertRaises(PromptConfigError):
|
||||
prompt_manager.render_system_task_message("transfer_failed_retry")
|
||||
|
||||
def test_non_verbose_prompt_requires_silence_until_all_tools_finish(self):
|
||||
with patch.object(settings, "AI_AGENT_VERBOSE", False):
|
||||
prompt = prompt_manager.get_agent_prompt()
|
||||
|
||||
self.assertIn(
|
||||
"[Important Instruction] STRICTLY ENFORCED:",
|
||||
prompt,
|
||||
)
|
||||
self.assertIn(
|
||||
"DO NOT output any conversational text, explanations, progress updates, or acknowledgements before the first tool call or between tool calls",
|
||||
prompt,
|
||||
)
|
||||
self.assertIn(
|
||||
"Only then may you send one final user-facing reply",
|
||||
prompt,
|
||||
)
|
||||
|
||||
def test_voice_prompt_marks_voice_tool_as_terminal_reply(self):
|
||||
"""语音回复提示词应说明语音工具会结束当前轮次。"""
|
||||
with patch.object(settings, "LLM_SUPPORT_AUDIO_OUTPUT", True):
|
||||
prompt = prompt_manager.get_agent_prompt()
|
||||
|
||||
self.assertIn("send_voice_message", prompt)
|
||||
self.assertIn("terminal response tool", prompt)
|
||||
self.assertIn("do not write a final text reply after it", prompt)
|
||||
self.assertIn("text fallback and still completes the reply", prompt)
|
||||
|
||||
def test_core_prompt_describes_voice_input_metadata(self):
|
||||
"""核心提示词应说明结构化消息中的语音输入元信息。"""
|
||||
prompt = prompt_manager.get_agent_prompt()
|
||||
|
||||
self.assertIn("input.mode", prompt)
|
||||
self.assertIn("voice", prompt)
|
||||
self.assertIn("`message` contains its transcript", prompt)
|
||||
|
||||
def test_verbose_prompt_does_not_inject_silence_until_tools_finish_rule(self):
|
||||
with patch.object(settings, "AI_AGENT_VERBOSE", True):
|
||||
prompt = prompt_manager.get_agent_prompt()
|
||||
|
||||
self.assertNotIn(
|
||||
"DO NOT output any conversational text, explanations, progress updates, or acknowledgements before the first tool call or between tool calls",
|
||||
prompt,
|
||||
)
|
||||
|
||||
def test_memory_onboarding_does_not_force_warm_intro(self):
|
||||
self.assertIn("Do NOT interrupt the current task", MEMORY_ONBOARDING_PROMPT)
|
||||
self.assertIn("Do NOT proactively greet warmly", MEMORY_ONBOARDING_PROMPT)
|
||||
self.assertNotIn("greet the user warmly", MEMORY_ONBOARDING_PROMPT)
|
||||
42
tests/test_history_batch_ai_redo_prompt.py
Normal file
42
tests/test_history_batch_ai_redo_prompt.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from types import SimpleNamespace
|
||||
|
||||
from app.agent.prompt import prompt_manager
|
||||
from app.api.endpoints.history import build_batch_manual_redo_prompt
|
||||
|
||||
|
||||
def test_batch_manual_redo_prompt_requires_plain_text_result():
|
||||
"""批量 AI 重新整理提示词应要求最终回复只输出纯文本描述。"""
|
||||
history = SimpleNamespace(
|
||||
id=7,
|
||||
src_fileitem={"path": "/downloads/a.mkv"},
|
||||
src="",
|
||||
seasons="",
|
||||
episodes="",
|
||||
status=False,
|
||||
title="示例",
|
||||
type="电影",
|
||||
category="电影",
|
||||
year="2024",
|
||||
src_storage="local",
|
||||
dest="/media/a.mkv",
|
||||
dest_storage="local",
|
||||
mode="copy",
|
||||
tmdbid=123,
|
||||
doubanid=None,
|
||||
errmsg="识别失败",
|
||||
)
|
||||
|
||||
prompt = build_batch_manual_redo_prompt([history])
|
||||
|
||||
assert "Final response must be plain text only" in prompt
|
||||
assert "Do NOT include any title/header, bullet list" in prompt
|
||||
assert "Markdown formatting" in prompt
|
||||
|
||||
|
||||
def test_batch_manual_redo_job_definition_contains_plain_text_rules():
|
||||
"""批量 AI 重新整理任务定义应直接声明纯文本最终回复规则。"""
|
||||
definition = prompt_manager.load_system_tasks_definition()
|
||||
task_rules = definition.task_types["batch_manual_transfer_redo"].task_rules
|
||||
|
||||
assert any("plain text only" in rule for rule in task_rules)
|
||||
assert any("Markdown formatting" in rule for rule in task_rules)
|
||||
230
tests/test_storage_download_path.py
Normal file
230
tests/test_storage_download_path.py
Normal file
@@ -0,0 +1,230 @@
|
||||
from pathlib import Path
|
||||
from typing import Iterator, Union
|
||||
from unittest.mock import PropertyMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from app import schemas
|
||||
from app.modules.filemanager.storages.alipan import AliPan
|
||||
from app.modules.filemanager.storages.rclone import Rclone
|
||||
from app.modules.filemanager.storages.u115 import U115Pan
|
||||
|
||||
|
||||
PAYLOAD = b"safe-download\n"
|
||||
|
||||
|
||||
def _noop_progress(_percent: Union[int, float]) -> None:
|
||||
"""忽略测试中的进度更新。"""
|
||||
return None
|
||||
|
||||
|
||||
class _FakeAliPanStream:
|
||||
"""模拟阿里云盘下载流。"""
|
||||
|
||||
def __init__(self, payload: bytes) -> None:
|
||||
self._payload = payload
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
"""模拟响应状态检查。"""
|
||||
return None
|
||||
|
||||
def iter_content(self, chunk_size: int) -> Iterator[bytes]:
|
||||
"""返回下载内容分块。"""
|
||||
yield self._payload
|
||||
|
||||
def __enter__(self) -> "_FakeAliPanStream":
|
||||
"""进入上下文。"""
|
||||
return self
|
||||
|
||||
def __exit__(self, *args: object) -> None:
|
||||
"""退出上下文。"""
|
||||
return None
|
||||
|
||||
|
||||
class _FakeU115Stream:
|
||||
"""模拟 115 下载流。"""
|
||||
|
||||
def __init__(self, payload: bytes) -> None:
|
||||
self._payload = payload
|
||||
|
||||
def raise_for_status(self) -> None:
|
||||
"""模拟响应状态检查。"""
|
||||
return None
|
||||
|
||||
def iter_bytes(self, chunk_size: int) -> Iterator[bytes]:
|
||||
"""返回下载内容分块。"""
|
||||
yield self._payload
|
||||
|
||||
def close(self) -> None:
|
||||
"""模拟关闭响应流。"""
|
||||
return None
|
||||
|
||||
def __enter__(self) -> "_FakeU115Stream":
|
||||
"""进入上下文。"""
|
||||
return self
|
||||
|
||||
def __exit__(self, *args: object) -> None:
|
||||
"""退出上下文。"""
|
||||
return None
|
||||
|
||||
|
||||
class _FakeU115Session:
|
||||
"""模拟 115 HTTP 会话。"""
|
||||
|
||||
def __init__(self, payload: bytes) -> None:
|
||||
self._payload = payload
|
||||
|
||||
def stream(self, method: str, url: str) -> _FakeU115Stream:
|
||||
"""返回伪造的下载流。"""
|
||||
return _FakeU115Stream(self._payload)
|
||||
|
||||
|
||||
class _FakeRcloneProcess:
|
||||
"""模拟 rclone 子进程。"""
|
||||
|
||||
stdout: list[str] = []
|
||||
|
||||
def wait(self) -> int:
|
||||
"""返回成功退出码。"""
|
||||
return 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("name", "expected"),
|
||||
[
|
||||
("../proof.txt", "proof.txt"),
|
||||
("..\\proof.txt", "proof.txt"),
|
||||
("/tmp/proof.txt", "proof.txt"),
|
||||
],
|
||||
)
|
||||
def test_build_download_path_strips_remote_directory_segments(
|
||||
tmp_path: Path, name: str, expected: str
|
||||
) -> None:
|
||||
"""本地下载路径应剥离远端文件名中的目录片段。"""
|
||||
storage = Rclone.__new__(Rclone)
|
||||
fileitem = schemas.FileItem(path=f"/remote/{expected}", name=name)
|
||||
|
||||
local_path = storage._build_download_path(fileitem, tmp_path)
|
||||
|
||||
assert local_path == tmp_path / expected
|
||||
assert local_path.resolve().relative_to(tmp_path.resolve()) == Path(expected)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", ["", ".", "..", "subdir/.."])
|
||||
def test_build_download_path_rejects_unsafe_filename(
|
||||
tmp_path: Path, name: str
|
||||
) -> None:
|
||||
"""本地下载路径应拒绝无法安全落盘的文件名。"""
|
||||
storage = Rclone.__new__(Rclone)
|
||||
fileitem = schemas.FileItem(path="/remote/proof.txt", name=name)
|
||||
|
||||
assert storage._build_download_path(fileitem, tmp_path) is None
|
||||
|
||||
|
||||
def test_alipan_download_writes_sanitized_filename(tmp_path: Path) -> None:
|
||||
"""阿里云盘下载应将路径穿越文件名写入目标目录内。"""
|
||||
alipan = AliPan.__new__(AliPan)
|
||||
alipan.chunk_size = 8192
|
||||
fileitem = schemas.FileItem(
|
||||
storage="alipan",
|
||||
type="file",
|
||||
path="/remote/proof.txt",
|
||||
name="../proof.txt",
|
||||
size=len(PAYLOAD),
|
||||
fileid="file-id",
|
||||
drive_id="drive-id",
|
||||
)
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
alipan,
|
||||
"_request_api",
|
||||
return_value={"url": "https://example.invalid/proof.txt"},
|
||||
),
|
||||
patch.object(AliPan, "access_token", new_callable=PropertyMock, return_value=None),
|
||||
patch(
|
||||
"app.modules.filemanager.storages.alipan.transfer_process",
|
||||
return_value=_noop_progress,
|
||||
),
|
||||
patch(
|
||||
"app.modules.filemanager.storages.alipan.global_vars.is_transfer_stopped",
|
||||
return_value=False,
|
||||
),
|
||||
patch("app.modules.filemanager.storages.alipan.RequestUtils") as request_utils,
|
||||
):
|
||||
request_utils.return_value.get_stream.return_value = _FakeAliPanStream(PAYLOAD)
|
||||
result = alipan.download(fileitem, path=tmp_path)
|
||||
|
||||
expected_path = tmp_path / "proof.txt"
|
||||
assert result == expected_path
|
||||
assert expected_path.read_bytes() == PAYLOAD
|
||||
assert not (tmp_path.parent / "proof.txt").exists()
|
||||
|
||||
|
||||
def test_u115_download_writes_sanitized_filename(tmp_path: Path) -> None:
|
||||
"""115 下载应将路径穿越文件名写入目标目录内。"""
|
||||
u115 = U115Pan.__new__(U115Pan)
|
||||
u115.chunk_size = 8192
|
||||
u115.session = _FakeU115Session(PAYLOAD)
|
||||
detail = schemas.FileItem(size=len(PAYLOAD), pickcode="pick-code")
|
||||
fileitem = schemas.FileItem(
|
||||
storage="u115",
|
||||
type="file",
|
||||
path="/remote/proof.txt",
|
||||
name="../proof.txt",
|
||||
size=len(PAYLOAD),
|
||||
)
|
||||
|
||||
with (
|
||||
patch.object(u115, "get_item", return_value=detail),
|
||||
patch.object(
|
||||
u115,
|
||||
"_request_api",
|
||||
return_value={"file-id": {"url": {"url": "https://example.invalid/proof.txt"}}},
|
||||
),
|
||||
patch(
|
||||
"app.modules.filemanager.storages.u115.transfer_process",
|
||||
return_value=_noop_progress,
|
||||
),
|
||||
patch(
|
||||
"app.modules.filemanager.storages.u115.global_vars.is_transfer_stopped",
|
||||
return_value=False,
|
||||
),
|
||||
):
|
||||
result = u115.download(fileitem, path=tmp_path)
|
||||
|
||||
expected_path = tmp_path / "proof.txt"
|
||||
assert result == expected_path
|
||||
assert expected_path.read_bytes() == PAYLOAD
|
||||
assert not (tmp_path.parent / "proof.txt").exists()
|
||||
|
||||
|
||||
def test_rclone_download_uses_sanitized_target_path(tmp_path: Path) -> None:
|
||||
"""rclone 下载应把清洗后的本地路径传给 copyto。"""
|
||||
storage = Rclone.__new__(Rclone)
|
||||
fileitem = schemas.FileItem(
|
||||
storage="rclone",
|
||||
type="file",
|
||||
path="/remote/proof.txt",
|
||||
name="../proof.txt",
|
||||
size=len(PAYLOAD),
|
||||
)
|
||||
captured_cmd: dict[str, list[str]] = {}
|
||||
|
||||
def fake_popen(cmd: list[str], *args: object, **kwargs: object) -> _FakeRcloneProcess:
|
||||
captured_cmd["cmd"] = cmd
|
||||
return _FakeRcloneProcess()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"app.modules.filemanager.storages.rclone.transfer_process",
|
||||
return_value=_noop_progress,
|
||||
),
|
||||
patch("app.modules.filemanager.storages.rclone.subprocess.Popen", side_effect=fake_popen),
|
||||
):
|
||||
result = storage.download(fileitem, path=tmp_path)
|
||||
|
||||
expected_path = tmp_path / "proof.txt"
|
||||
assert result == expected_path
|
||||
assert captured_cmd["cmd"][-1] == str(expected_path)
|
||||
assert expected_path.resolve().relative_to(tmp_path.resolve()) == Path("proof.txt")
|
||||
@@ -456,6 +456,69 @@ class SubscribeChainTest(TestCase):
|
||||
self.assertEqual(SubscribeChain.get_best_version_current_priority(subscribe), 100)
|
||||
self.assertTrue(SubscribeChain.is_best_version_complete(subscribe))
|
||||
|
||||
def test_get_subscribe_no_exists_expands_whole_missing_when_custom_start_skips_existing_range(self):
|
||||
"""自定义开始集跳过季初集数时,缺失整季需要转成显式目标集。"""
|
||||
no_exists = {
|
||||
"media-key": {
|
||||
1: SimpleNamespace(season=1, episodes=[], total_episode=48, start_episode=1)
|
||||
}
|
||||
}
|
||||
|
||||
exist_flag, result = SubscribeChain._SubscribeChain__get_subscribe_no_exits(
|
||||
subscribe_name="主角 S01",
|
||||
no_exists=no_exists,
|
||||
mediakey="media-key",
|
||||
begin_season=1,
|
||||
total_episode=48,
|
||||
start_episode=44,
|
||||
)
|
||||
|
||||
self.assertFalse(exist_flag)
|
||||
self.assertEqual(result["media-key"][1].episodes, [44, 45, 46, 47, 48])
|
||||
self.assertEqual(result["media-key"][1].start_episode, 44)
|
||||
self.assertEqual(result["media-key"][1].total_episode, 48)
|
||||
|
||||
def test_get_subscribe_no_exists_keeps_whole_missing_when_custom_start_matches_original_start(self):
|
||||
"""自定义开始集没有缩小范围时,仍保留空集列表表示整季缺失。"""
|
||||
no_exists = {
|
||||
"media-key": {
|
||||
1: SimpleNamespace(season=1, episodes=[], total_episode=48, start_episode=1)
|
||||
}
|
||||
}
|
||||
|
||||
exist_flag, result = SubscribeChain._SubscribeChain__get_subscribe_no_exits(
|
||||
subscribe_name="主角 S01",
|
||||
no_exists=no_exists,
|
||||
mediakey="media-key",
|
||||
begin_season=1,
|
||||
total_episode=48,
|
||||
start_episode=1,
|
||||
)
|
||||
|
||||
self.assertFalse(exist_flag)
|
||||
self.assertEqual(result["media-key"][1].episodes, [])
|
||||
self.assertEqual(result["media-key"][1].start_episode, 1)
|
||||
self.assertEqual(result["media-key"][1].total_episode, 48)
|
||||
|
||||
def test_best_version_full_pack_first_keeps_whole_missing_for_custom_start_episode(self):
|
||||
"""分集洗版优先全集时,空集列表仍表示下载链按整季资源处理。"""
|
||||
subscribe = self._build_subscribe(
|
||||
best_version=1,
|
||||
best_version_full=0,
|
||||
start_episode=44,
|
||||
total_episode=48,
|
||||
episode_priority={str(episode): 80 for episode in range(44, 49)},
|
||||
)
|
||||
|
||||
result = SubscribeChain._SubscribeChain__build_full_pack_first_no_exists(
|
||||
subscribe=subscribe,
|
||||
mediakey="media-key",
|
||||
)
|
||||
|
||||
self.assertEqual(result["media-key"][1].episodes, [])
|
||||
self.assertEqual(result["media-key"][1].start_episode, 44)
|
||||
self.assertEqual(result["media-key"][1].total_episode, 48)
|
||||
|
||||
def test_is_episode_range_covered_matches_pending_episodes(self):
|
||||
subscribe = self._build_subscribe(
|
||||
total_episode=12,
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
APP_VERSION = 'v2.13.5'
|
||||
FRONTEND_VERSION = 'v2.13.5'
|
||||
APP_VERSION = 'v2.13.6'
|
||||
FRONTEND_VERSION = 'v2.13.6'
|
||||
|
||||
Reference in New Issue
Block a user