Optimize agent tool async blocking paths

This commit is contained in:
jxxghp
2026-04-28 20:36:49 +08:00
parent c5b716c231
commit 5c1487a9a6
31 changed files with 950 additions and 636 deletions

View File

@@ -1,6 +1,10 @@
import asyncio
import json
import threading
from abc import ABCMeta, abstractmethod
from typing import Any, Optional
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any, Callable, Optional
from langchain_core.tools import BaseTool
from pydantic import PrivateAttr
@@ -19,6 +23,44 @@ class ToolChain(ChainBase):
pass
# 将常见的阻塞调用按能力域拆分到独立线程池,避免外部慢 IO 抢占同一批 worker。
_BLOCKING_BUCKET_LIMITS = {
"default": 4,
"config": 2,
"db": 4,
"downloader": 4,
"mediaserver": 4,
"plugin": 2,
"rule": 2,
"site": 4,
"storage": 4,
"subscribe": 2,
"workflow": 2,
}
_blocking_semaphores = {
bucket: asyncio.Semaphore(limit)
for bucket, limit in _BLOCKING_BUCKET_LIMITS.items()
}
_blocking_executors: dict[str, ThreadPoolExecutor] = {}
_blocking_executor_lock = threading.Lock()
def _get_blocking_executor(bucket: str) -> ThreadPoolExecutor:
"""按桶懒加载线程池,避免在导入阶段创建过多 worker。"""
with _blocking_executor_lock:
executor = _blocking_executors.get(bucket)
if executor:
return executor
limit = _BLOCKING_BUCKET_LIMITS[bucket]
executor = ThreadPoolExecutor(
max_workers=limit,
thread_name_prefix=f"agent-tool-{bucket}",
)
_blocking_executors[bucket] = executor
return executor
class MoviePilotTool(BaseTool, metaclass=ABCMeta):
"""
MoviePilot专用工具基类LangChain v1 / langchain_core
@@ -130,6 +172,23 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
"""子类实现具体的工具执行逻辑"""
raise NotImplementedError
@staticmethod
async def run_blocking(
bucket: str, func: Callable[..., Any], *args: Any, **kwargs: Any
) -> Any:
"""
在受控线程池中运行阻塞型同步代码,避免拖住 FastAPI 主事件循环。
"""
bucket_name = bucket if bucket in _BLOCKING_BUCKET_LIMITS else "default"
semaphore = _blocking_semaphores[bucket_name]
bound_call = partial(func, *args, **kwargs)
async with semaphore:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
_get_blocking_executor(bucket_name), bound_call
)
def set_message_attr(self, channel: str, source: str, username: str):
"""
设置消息属性
@@ -165,6 +224,8 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
if not self._channel or not self._source:
return None
# 渠道配置来自 SystemConfigOper 内存缓存,可以直接读取;
# 只有用户信息需要走异步数据库查询。
user_id_str = str(self._user_id) if self._user_id else None
channel_type_map = {
@@ -220,7 +281,7 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
return None
user = (
UserOper().get_by_name(self._username)
await UserOper().async_get_by_name(self._username)
if self._username
else None
)
@@ -235,7 +296,7 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
)
else:
user = (
UserOper().get_by_name(self._username)
await UserOper().async_get_by_name(self._username)
if self._username
else None
)

View File

@@ -104,6 +104,29 @@ class AddDownloadTool(MoviePilotTool):
return None
return context
@classmethod
async def _async_resolve_cached_context(cls, torrent_ref: str) -> Optional[Context]:
"""异步读取最近搜索缓存,避免在协程里直接访问同步文件缓存。"""
ref = str(torrent_ref).strip()
if ":" not in ref:
return None
try:
ref_hash, ref_index = ref.split(":", 1)
index = int(ref_index)
except (TypeError, ValueError):
return None
if index < 1:
return None
results = await SearchChain().async_last_search_results() or []
if index > len(results):
return None
context = results[index - 1]
if not ref_hash or cls._build_torrent_ref(context) != ref_hash:
return None
return context
@staticmethod
def _merge_labels_with_system_tag(labels: Optional[str]) -> Optional[str]:
"""合并用户标签与系统默认标签,确保任务可被系统管理"""
@@ -164,6 +187,43 @@ class AddDownloadTool(MoviePilotTool):
return Path(FileURI(storage=dir_conf.storage or "local", path=dir_conf.download_path).uri)
@staticmethod
def _download_direct_sync(
torrent_input: str,
download_dir: Path,
merged_labels: Optional[str],
downloader: Optional[str],
) -> tuple[Optional[str], Optional[str]]:
"""同步添加磁力下载任务,避免下载器调用阻塞事件循环。"""
result = DownloadChain().download(
content=torrent_input,
download_dir=download_dir,
cookie=None,
label=merged_labels,
downloader=downloader,
)
if result:
_, did, _, error_msg = result
else:
did, error_msg = None, "未找到下载器"
return did, error_msg
@staticmethod
def _download_single_sync(
context: Context,
downloader: Optional[str],
save_path: Optional[str],
merged_labels: Optional[str],
) -> tuple[Optional[str], Optional[str]]:
"""同步提交带上下文的下载任务,避免站点下载与下载器调用阻塞事件循环。"""
return DownloadChain().download_single(
context=context,
downloader=downloader,
save_path=save_path,
label=merged_labels,
return_detail=True,
)
async def run(self, torrent_url: Optional[List[str]] = None,
downloader: Optional[str] = None, save_path: Optional[str] = None,
labels: Optional[str] = None, **kwargs) -> str:
@@ -175,14 +235,13 @@ class AddDownloadTool(MoviePilotTool):
if not torrent_inputs:
return "错误torrent_url 不能为空。"
download_chain = DownloadChain()
merged_labels = self._merge_labels_with_system_tag(labels)
success_count = 0
failed_messages = []
for torrent_input in torrent_inputs:
if self._is_torrent_ref(torrent_input):
cached_context = self._resolve_cached_context(torrent_input)
cached_context = await self._async_resolve_cached_context(torrent_input)
if not cached_context or not cached_context.torrent_info:
failed_messages.append(f"{torrent_input} 引用无效,请重新使用 get_search_results 查看搜索结果")
continue
@@ -232,33 +291,33 @@ class AddDownloadTool(MoviePilotTool):
f"{torrent_input} 不是有效的下载内容,非 hash:id 时仅支持 magnet: 开头"
)
continue
download_dir = self._resolve_direct_download_dir(save_path)
download_dir = await self.run_blocking(
"storage", self._resolve_direct_download_dir, save_path
)
if not download_dir:
failed_messages.append(f"{torrent_input} 缺少保存路径,且系统未配置可用下载目录")
continue
result = download_chain.download(
content=torrent_input,
download_dir=download_dir,
cookie=None,
label=merged_labels,
downloader=downloader
did, error_msg = await self.run_blocking(
"downloader",
self._download_direct_sync,
torrent_input,
download_dir,
merged_labels,
downloader,
)
if result:
_, did, _, error_msg = result
else:
did, error_msg = None, "未找到下载器"
if did:
success_count += 1
else:
failed_messages.append(self._build_failure_message(torrent_input, error_msg))
continue
did, error_msg = download_chain.download_single(
context=context,
downloader=downloader,
save_path=save_path,
label=merged_labels,
return_detail=True
did, error_msg = await self.run_blocking(
"downloader",
self._download_single_sync,
context,
downloader,
save_path,
merged_labels,
)
if did:
success_count += 1

View File

@@ -49,6 +49,15 @@ class DeleteDownloadTool(MoviePilotTool):
return message
@staticmethod
def _delete_download_sync(
hash_value: str, downloader: Optional[str] = None, delete_files: bool = False
) -> bool:
"""同步删除下载任务,避免下载器客户端阻塞事件循环。"""
return DownloadChain().remove_torrents(
hashs=[hash_value], downloader=downloader, delete_file=delete_files
)
async def run(
self,
hash: str,
@@ -61,16 +70,18 @@ class DeleteDownloadTool(MoviePilotTool):
)
try:
download_chain = DownloadChain()
# 仅支持通过hash删除任务
if len(hash) != 40 or not all(c in "0123456789abcdefABCDEF" for c in hash):
return "参数错误hash 格式无效,请先使用 query_download_tasks 工具获取正确的 hash。"
# 删除下载任务
# remove_torrents 支持 delete_file 参数,可以控制是否删除文件
result = download_chain.remove_torrents(
hashs=[hash], downloader=downloader, delete_file=delete_files
result = await self.run_blocking(
"downloader",
self._delete_download_sync,
hash,
downloader,
bool(delete_files),
)
if result:

View File

@@ -49,8 +49,11 @@ class DeleteSubscribeTool(MoviePilotTool):
# 在删除之前获取订阅信息(用于事件)
subscribe_info = subscribe.to_dict()
# 删除订阅
subscribe_oper.delete(subscribe_id)
await subscribe_oper.async_delete(subscribe_id)
# 分享订阅统计刷新本身已异步化,这里只需要在删除后触发即可。
SubscribeHelper().sub_done_async(
{"tmdbid": subscribe.tmdbid, "doubanid": subscribe.doubanid}
)
# 发送事件
await eventmanager.async_send_event(
@@ -58,11 +61,6 @@ class DeleteSubscribeTool(MoviePilotTool):
{"subscribe_id": subscribe_id, "subscribe_info": subscribe_info},
)
# 统计订阅
SubscribeHelper().sub_done_async(
{"tmdbid": subscribe.tmdbid, "doubanid": subscribe.doubanid}
)
return f"成功删除订阅:{subscribe.name} ({subscribe.year})"
except Exception as e:
logger.error(f"删除订阅失败: {e}", exc_info=True)

View File

@@ -37,21 +37,17 @@ class DeleteTransferHistoryTool(MoviePilotTool):
try:
transferhis = TransferHistoryOper()
# 查询历史记录是否存在
history = transferhis.get(history_id)
history = await transferhis.async_get(history_id)
if not history:
return f"错误整理历史记录不存在ID={history_id}"
# 保存信息用于返回
title = history.title or "未知"
src = history.src or "未知"
status = "成功" if history.status else "失败"
# 删除记录
transferhis.delete(history_id)
return f"已删除整理历史记录ID={history_id},标题={title},源路径={src},状态={status}"
await transferhis.async_delete(history_id)
return (
f"已删除整理历史记录ID={history_id},标题={title},源路径={src},状态={status}"
)
except Exception as e:
logger.error(f"删除整理历史记录失败: {e}", exc_info=True)
return f"删除整理历史记录时发生错误: {str(e)}"

View File

@@ -38,93 +38,81 @@ class ListDirectoryTool(MoviePilotTool):
return message
@staticmethod
def _list_directory_sync(
path: str, storage: Optional[str] = "local", sort_by: Optional[str] = "name"
) -> str:
"""
目录遍历可能触发本地磁盘或远程存储请求,统一放到线程池中执行。
"""
if not path:
return "错误:路径不能为空"
if storage == "local":
if not path.startswith("/") and not (len(path) > 1 and path[1] == ":"):
path = str(Path(path).resolve())
elif not path.startswith("/"):
path = "/" + path
fileitem = FileItem(storage=storage or "local", path=path, type="dir")
file_list = StorageChain().list_files(fileitem, recursion=False)
if file_list is None:
return f"无法访问目录:{path},请检查路径是否正确或存储是否可用"
if not file_list:
return f"目录 {path} 为空"
if sort_by == "time":
file_list.sort(key=lambda x: x.modify_time or 0, reverse=True)
else:
file_list.sort(
key=lambda x: (
0 if x.type == "dir" else 1,
StringUtils.natural_sort_key(x.name or ""),
)
)
total_count = len(file_list)
limited_list = file_list[:20]
simplified_items = []
for item in limited_list:
size_str = StringUtils.str_filesize(item.size) if item.size else None
modify_time_str = None
if item.modify_time:
try:
modify_time_str = datetime.fromtimestamp(item.modify_time).strftime(
"%Y-%m-%d %H:%M:%S"
)
except (ValueError, OSError):
modify_time_str = str(item.modify_time)
simplified = {
"name": item.name,
"type": item.type,
"path": item.path,
"size": size_str,
"modify_time": modify_time_str,
}
if item.type == "file" and item.extension:
simplified["extension"] = item.extension
simplified_items.append(simplified)
result_json = json.dumps(simplified_items, ensure_ascii=False, indent=2)
if total_count > 20:
return (
f"注意:目录中共有 {total_count} 个项目,为节省上下文空间,仅显示前 20 个项目。\n\n"
f"{result_json}"
)
return result_json
async def run(self, path: str, storage: Optional[str] = "local",
sort_by: Optional[str] = "name", **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: path={path}, storage={storage}, sort_by={sort_by}")
try:
# 规范化路径
if not path:
return "错误:路径不能为空"
# 确保路径格式正确
if storage == "local":
# 本地路径处理
if not path.startswith("/") and not (len(path) > 1 and path[1] == ":"):
# 相对路径,尝试转换为绝对路径
path = str(Path(path).resolve())
else:
# 远程存储路径,确保以/开头
if not path.startswith("/"):
path = "/" + path
# 创建FileItem
fileitem = FileItem(
storage=storage or "local",
path=path,
type="dir"
return await self.run_blocking(
"storage", self._list_directory_sync, path, storage, sort_by
)
# 查询目录内容
storage_chain = StorageChain()
file_list = storage_chain.list_files(fileitem, recursion=False)
if file_list is None:
return f"无法访问目录:{path},请检查路径是否正确或存储是否可用"
if not file_list:
return f"目录 {path} 为空"
# 排序
if sort_by == "time":
file_list.sort(key=lambda x: x.modify_time or 0, reverse=True)
else:
# 默认按名称排序(目录优先,然后按名称)
file_list.sort(key=lambda x: (
0 if x.type == "dir" else 1,
StringUtils.natural_sort_key(x.name or "")
))
# 限制返回数量
total_count = len(file_list)
limited_list = file_list[:20]
# 转换为字典格式
simplified_items = []
for item in limited_list:
# 格式化文件大小
size_str = None
if item.size:
size_str = StringUtils.str_filesize(item.size)
# 格式化修改时间
modify_time_str = None
if item.modify_time:
try:
modify_time_str = datetime.fromtimestamp(item.modify_time).strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, OSError):
modify_time_str = str(item.modify_time)
simplified = {
"name": item.name,
"type": item.type,
"path": item.path,
"size": size_str,
"modify_time": modify_time_str
}
# 如果是文件,添加扩展名
if item.type == "file" and item.extension:
simplified["extension"] = item.extension
simplified_items.append(simplified)
result_json = json.dumps(simplified_items, ensure_ascii=False, indent=2)
# 如果结果被裁剪,添加提示信息
if total_count > 100:
return f"注意:目录中共有 {total_count} 个项目,为节省上下文空间,仅显示前 100 个项目。\n\n{result_json}"
else:
return result_json
except Exception as e:
logger.error(f"查询目录内容失败: {e}", exc_info=True)
return f"查询目录内容时发生错误: {str(e)}"

View File

@@ -66,6 +66,38 @@ class ModifyDownloadTool(MoviePilotTool):
parts.append(f"下载器: {downloader}")
return " | ".join(parts)
@staticmethod
def _modify_download_sync(
hash_value: str,
action: Optional[str] = None,
tags: Optional[List[str]] = None,
downloader: Optional[str] = None,
) -> List[str]:
"""同步修改下载任务状态和标签,避免下载器 SDK 阻塞事件循环。"""
download_chain = DownloadChain()
results = []
if tags:
tag_result = download_chain.set_torrents_tag(
hashs=[hash_value], tags=tags, downloader=downloader
)
if tag_result:
results.append(f"成功设置标签:{', '.join(tags)}")
else:
results.append("设置标签失败,请检查任务是否存在或下载器是否可用")
if action:
action_result = download_chain.set_downloading(
hash_str=hash_value, oper=action, name=downloader
)
action_desc = "开始" if action == "start" else "暂停"
if action_result:
results.append(f"成功{action_desc}下载任务")
else:
results.append(f"{action_desc}下载任务失败,请检查任务是否存在或下载器是否可用")
return results
async def run(
self,
hash: str,
@@ -91,31 +123,14 @@ class ModifyDownloadTool(MoviePilotTool):
if action and action not in ("start", "stop"):
return f"参数错误action 只支持 'start'(开始下载)或 'stop'(暂停下载),收到: '{action}'"
download_chain = DownloadChain()
results = []
# 设置标签
if tags:
tag_result = download_chain.set_torrents_tag(
hashs=[hash], tags=tags, downloader=downloader
)
if tag_result:
results.append(f"成功设置标签:{', '.join(tags)}")
else:
results.append(f"设置标签失败,请检查任务是否存在或下载器是否可用")
# 执行开始/暂停操作
if action:
action_result = download_chain.set_downloading(
hash_str=hash, oper=action, name=downloader
)
action_desc = "开始" if action == "start" else "暂停"
if action_result:
results.append(f"成功{action_desc}下载任务")
else:
results.append(
f"{action_desc}下载任务失败,请检查任务是否存在或下载器是否可用"
)
results = await self.run_blocking(
"downloader",
self._modify_download_sync,
hash,
action,
tags,
downloader,
)
return f"下载任务 {hash}" + "".join(results)

View File

@@ -33,11 +33,15 @@ class QueryCustomIdentifiersTool(MoviePilotTool):
"""生成友好的提示消息"""
return "查询自定义识别词"
@staticmethod
def _load_custom_identifiers():
"""从内存配置缓存中读取自定义识别词。"""
return SystemConfigOper().get(SystemConfigKey.CustomIdentifiers)
async def run(self, **kwargs) -> str:
logger.info(f"执行工具: {self.name}")
try:
system_config_oper = SystemConfigOper()
identifiers = system_config_oper.get(SystemConfigKey.CustomIdentifiers)
identifiers = self._load_custom_identifiers()
if identifiers:
return json.dumps(
{

View File

@@ -47,88 +47,93 @@ class QueryDirectorySettingsTool(MoviePilotTool):
return " | ".join(parts) if len(parts) > 1 else parts[0]
@staticmethod
def _query_directory_settings(
directory_type: Optional[str] = "all",
storage_type: Optional[str] = "all",
name: Optional[str] = None,
) -> str:
"""
目录配置完全来自内存配置缓存,这里只做本地过滤和序列化。
"""
directory_helper = DirectoryHelper()
if directory_type == "download":
dirs = directory_helper.get_download_dirs()
elif directory_type == "library":
dirs = directory_helper.get_library_dirs()
else:
dirs = directory_helper.get_dirs()
filtered_dirs = []
for d in dirs:
if storage_type == "local":
if directory_type == "download" and d.storage != "local":
continue
if directory_type == "library" and d.library_storage != "local":
continue
if directory_type == "all":
if d.download_path and d.storage != "local":
continue
if d.library_path and d.library_storage != "local":
continue
elif storage_type == "remote":
if directory_type == "download" and d.storage == "local":
continue
if directory_type == "library" and d.library_storage == "local":
continue
if directory_type == "all":
if d.download_path and d.storage == "local":
continue
if d.library_path and d.library_storage == "local":
continue
if name and d.name and name.lower() not in d.name.lower():
continue
filtered_dirs.append(d)
if not filtered_dirs:
return "未找到相关目录配置"
simplified_dirs = []
for d in filtered_dirs:
simplified_dirs.append(
{
"name": d.name,
"priority": d.priority,
"storage": d.storage,
"download_path": d.download_path,
"library_path": d.library_path,
"library_storage": d.library_storage,
"media_type": d.media_type,
"media_category": d.media_category,
"monitor_type": d.monitor_type,
"monitor_mode": d.monitor_mode,
"transfer_type": d.transfer_type,
"overwrite_mode": d.overwrite_mode,
"renaming": d.renaming,
"scraping": d.scraping,
"notify": d.notify,
"download_type_folder": d.download_type_folder,
"download_category_folder": d.download_category_folder,
"library_type_folder": d.library_type_folder,
"library_category_folder": d.library_category_folder,
}
)
return json.dumps(simplified_dirs, ensure_ascii=False, indent=2)
async def run(self, directory_type: Optional[str] = "all",
storage_type: Optional[str] = "all",
name: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: directory_type={directory_type}, storage_type={storage_type}, name={name}")
try:
directory_helper = DirectoryHelper()
# 根据目录类型获取目录列表
if directory_type == "download":
dirs = directory_helper.get_download_dirs()
elif directory_type == "library":
dirs = directory_helper.get_library_dirs()
else:
dirs = directory_helper.get_dirs()
# 按存储类型过滤
filtered_dirs = []
for d in dirs:
# 按存储类型过滤
if storage_type == "local":
# 对于下载目录,检查 storage对于媒体库目录检查 library_storage
if directory_type == "download" and d.storage != "local":
continue
elif directory_type == "library" and d.library_storage != "local":
continue
elif directory_type == "all":
# 检查是否有本地存储配置
if d.download_path and d.storage != "local":
continue
if d.library_path and d.library_storage != "local":
continue
elif storage_type == "remote":
# 对于下载目录,检查 storage对于媒体库目录检查 library_storage
if directory_type == "download" and d.storage == "local":
continue
elif directory_type == "library" and d.library_storage == "local":
continue
elif directory_type == "all":
# 检查是否有远程存储配置
if d.download_path and d.storage == "local":
continue
if d.library_path and d.library_storage == "local":
continue
# 按名称过滤(部分匹配)
if name and d.name and name.lower() not in d.name.lower():
continue
filtered_dirs.append(d)
if filtered_dirs:
# 转换为字典格式,只保留关键信息
simplified_dirs = []
for d in filtered_dirs:
simplified = {
"name": d.name,
"priority": d.priority,
"storage": d.storage,
"download_path": d.download_path,
"library_path": d.library_path,
"library_storage": d.library_storage,
"media_type": d.media_type,
"media_category": d.media_category,
"monitor_type": d.monitor_type,
"monitor_mode": d.monitor_mode,
"transfer_type": d.transfer_type,
"overwrite_mode": d.overwrite_mode,
"renaming": d.renaming,
"scraping": d.scraping,
"notify": d.notify,
"download_type_folder": d.download_type_folder,
"download_category_folder": d.download_category_folder,
"library_type_folder": d.library_type_folder,
"library_category_folder": d.library_category_folder
}
simplified_dirs.append(simplified)
result_json = json.dumps(simplified_dirs, ensure_ascii=False, indent=2)
return result_json
return "未找到相关目录配置"
return self._query_directory_settings(
directory_type=directory_type,
storage_type=storage_type,
name=name,
)
except Exception as e:
logger.error(f"查询系统目录设置失败: {e}", exc_info=True)
return f"查询系统目录设置时发生错误: {str(e)}"

View File

@@ -1,7 +1,7 @@
"""查询下载工具"""
import json
from typing import Optional, Type, List, Union
from typing import Any, Dict, List, Optional, Type, Union
from pydantic import BaseModel, Field
@@ -64,6 +64,126 @@ class QueryDownloadTasksTool(MoviePilotTool):
except (TypeError, ValueError):
return None
@staticmethod
def _apply_download_history(
torrent: Union[TransferTorrent, DownloadingTorrent], history: Any
) -> None:
"""将下载历史中的补充信息回填到下载任务结果中。"""
if not history:
return
if hasattr(torrent, "media"):
torrent.media = {
"tmdbid": history.tmdbid,
"type": history.type,
"title": history.title,
"season": history.seasons,
"episode": history.episodes,
"image": history.image,
}
if hasattr(torrent, "username"):
torrent.username = history.username
torrent.userid = history.userid
@classmethod
def _load_history_map(
cls, torrents: List[Union[TransferTorrent, DownloadingTorrent]]
) -> Dict[str, Any]:
"""批量加载下载历史,避免逐条查询形成 N+1。"""
hashes = [torrent.hash for torrent in torrents if getattr(torrent, "hash", None)]
if not hashes:
return {}
return DownloadHistoryOper().get_by_hashes(hashes)
@classmethod
def _query_downloads_sync(
cls,
downloader: Optional[str] = None,
status: Optional[str] = "all",
hash_value: Optional[str] = None,
title: Optional[str] = None,
tag: Optional[str] = None,
) -> Dict[str, Any]:
"""
同步查询下载器和下载历史,整个链路放在线程池中执行。
"""
download_chain = DownloadChain()
if hash_value:
torrents = (
download_chain.list_torrents(downloader=downloader, hashs=[hash_value])
or []
)
if not torrents:
return {
"message": f"未找到hash为 {hash_value} 的下载任务(该任务可能已完成、已删除或不存在)"
}
history_map = cls._load_history_map(torrents)
for torrent in torrents:
cls._apply_download_history(torrent, history_map.get(torrent.hash))
filtered_downloads = list(torrents)
elif title:
all_torrents = cls._get_all_torrents(download_chain, downloader)
history_map = cls._load_history_map(all_torrents)
filtered_downloads = []
title_lower = title.lower()
for torrent in all_torrents:
history = history_map.get(torrent.hash)
matched = title_lower in (torrent.title or "").lower() or title_lower in (
getattr(torrent, "name", None) or ""
).lower()
if not matched and history and history.title:
matched = title_lower in history.title.lower()
if not matched:
continue
cls._apply_download_history(torrent, history)
filtered_downloads.append(torrent)
if not filtered_downloads:
return {"message": f"未找到标题包含 '{title}' 的下载任务"}
else:
if status == "downloading":
downloads = download_chain.downloading(name=downloader) or []
filtered_downloads = [
dl
for dl in downloads
if not downloader or dl.downloader == downloader
]
else:
all_torrents = cls._get_all_torrents(download_chain, downloader)
filtered_downloads = []
for torrent in all_torrents:
if downloader and torrent.downloader != downloader:
continue
if status == "completed" and torrent.state not in [
"seeding",
"completed",
]:
continue
if status == "paused" and torrent.state != "paused":
continue
filtered_downloads.append(torrent)
history_map = cls._load_history_map(filtered_downloads)
for torrent in filtered_downloads:
cls._apply_download_history(torrent, history_map.get(torrent.hash))
if tag and filtered_downloads:
tag_lower = tag.lower()
filtered_downloads = [
d for d in filtered_downloads if d.tags and tag_lower in d.tags.lower()
]
if not filtered_downloads:
return {"message": f"未找到标签包含 '{tag}' 的下载任务"}
if not filtered_downloads:
return {"message": "未找到相关下载任务"}
return {"downloads": filtered_downloads}
def get_tool_message(self, **kwargs) -> Optional[str]:
"""根据查询参数生成友好的提示消息"""
downloader = kwargs.get("downloader")
@@ -98,124 +218,19 @@ class QueryDownloadTasksTool(MoviePilotTool):
tag: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: downloader={downloader}, status={status}, hash={hash}, title={title}, tag={tag}")
try:
download_chain = DownloadChain()
# 如果提供了hash直接查询该hash的任务不限制状态
if hash:
torrents = download_chain.list_torrents(downloader=downloader, hashs=[hash]) or []
if not torrents:
return f"未找到hash为 {hash} 的下载任务(该任务可能已完成、已删除或不存在)"
# 转换为DownloadingTorrent格式
downloads = []
for torrent in torrents:
# 获取下载历史信息
history = DownloadHistoryOper().get_by_hash(torrent.hash)
if history:
if hasattr(torrent, "media"):
torrent.media = {
"tmdbid": history.tmdbid,
"type": history.type,
"title": history.title,
"season": history.seasons,
"episode": history.episodes,
"image": history.image,
}
if hasattr(torrent, "username"):
torrent.username = history.username
torrent.userid = history.userid
downloads.append(torrent)
filtered_downloads = downloads
elif title:
# 如果提供了title查询所有任务并搜索匹配的标题
# 查询所有状态的任务
all_torrents = self._get_all_torrents(download_chain, downloader)
filtered_downloads = []
title_lower = title.lower()
for torrent in all_torrents:
# 获取下载历史信息
history = DownloadHistoryOper().get_by_hash(torrent.hash)
# 检查标题或名称是否匹配(包括下载历史中的标题)
matched = False
# 检查torrent的title和name字段
if (title_lower in (torrent.title or "").lower()) or \
(title_lower in (getattr(torrent, "name", None) or "").lower()):
matched = True
# 检查下载历史中的标题
if history and history.title:
if title_lower in history.title.lower():
matched = True
if matched:
if history:
if hasattr(torrent, "media"):
torrent.media = {
"tmdbid": history.tmdbid,
"type": history.type,
"title": history.title,
"season": history.seasons,
"episode": history.episodes,
"image": history.image,
}
if hasattr(torrent, "username"):
torrent.username = history.username
torrent.userid = history.userid
filtered_downloads.append(torrent)
if not filtered_downloads:
return f"未找到标题包含 '{title}' 的下载任务"
else:
# 根据status决定查询方式
if status == "downloading":
# 如果status为下载中使用downloading方法
downloads = download_chain.downloading(name=downloader) or []
filtered_downloads = []
for dl in downloads:
if downloader and dl.downloader != downloader:
continue
filtered_downloads.append(dl)
else:
# 其他状态completed、paused、all使用list_torrents查询所有任务
# 查询所有状态的任务
all_torrents = self._get_all_torrents(download_chain, downloader)
filtered_downloads = []
for torrent in all_torrents:
if downloader and torrent.downloader != downloader:
continue
# 根据status过滤
if status == "completed":
# 已完成的任务state为seeding或completed
if torrent.state not in ["seeding", "completed"]:
continue
elif status == "paused":
# 已暂停的任务
if torrent.state != "paused":
continue
# status == "all" 时不过滤
# 获取下载历史信息
history = DownloadHistoryOper().get_by_hash(torrent.hash)
if history:
if hasattr(torrent, "media"):
torrent.media = {
"tmdbid": history.tmdbid,
"type": history.type,
"title": history.title,
"season": history.seasons,
"episode": history.episodes,
"image": history.image,
}
if hasattr(torrent, "username"):
torrent.username = history.username
torrent.userid = history.userid
filtered_downloads.append(torrent)
# 按tag过滤
if tag and filtered_downloads:
tag_lower = tag.lower()
filtered_downloads = [
d for d in filtered_downloads
if d.tags and tag_lower in d.tags.lower()
]
if not filtered_downloads:
return f"未找到标签包含 '{tag}' 的下载任务"
payload = await self.run_blocking(
"downloader",
self._query_downloads_sync,
downloader,
status,
hash,
title,
tag,
)
if payload.get("message"):
return payload["message"]
filtered_downloads = payload.get("downloads") or []
if filtered_downloads:
# 限制最多20条结果
total_count = len(filtered_downloads)

View File

@@ -25,11 +25,15 @@ class QueryDownloadersTool(MoviePilotTool):
"""生成友好的提示消息"""
return "查询下载器配置"
@staticmethod
def _load_downloaders_config():
"""从内存配置缓存中读取下载器配置。"""
return SystemConfigOper().get(SystemConfigKey.Downloaders)
async def run(self, **kwargs) -> str:
logger.info(f"执行工具: {self.name}")
try:
system_config_oper = SystemConfigOper()
downloaders_config = system_config_oper.get(SystemConfigKey.Downloaders)
downloaders_config = self._load_downloaders_config()
if downloaders_config:
return json.dumps(downloaders_config, ensure_ascii=False, indent=2)
return "未配置下载器。"

View File

@@ -33,32 +33,32 @@ class QueryInstalledPluginsTool(MoviePilotTool):
"""生成友好的提示消息"""
return "查询已安装插件"
@staticmethod
def _list_installed_plugins() -> list[dict]:
"""读取已加载插件的内存快照。"""
plugin_manager = PluginManager()
local_plugins = plugin_manager.get_local_plugins()
installed_plugins = [plugin for plugin in local_plugins if plugin.installed]
return [
{
"id": plugin.id,
"plugin_name": plugin.plugin_name,
"plugin_desc": plugin.plugin_desc,
"plugin_version": plugin.plugin_version,
"plugin_author": plugin.plugin_author,
"state": plugin.state,
"has_page": plugin.has_page,
}
for plugin in installed_plugins
]
async def run(self, **kwargs) -> str:
logger.info(f"执行工具: {self.name}")
try:
plugin_manager = PluginManager()
local_plugins = plugin_manager.get_local_plugins()
# 仅返回已安装的插件
installed_plugins = [plugin for plugin in local_plugins if plugin.installed]
installed_plugins = self._list_installed_plugins()
if not installed_plugins:
return "当前没有已安装的插件"
plugins_list = []
for plugin in installed_plugins:
plugins_list.append(
{
"id": plugin.id,
"plugin_name": plugin.plugin_name,
"plugin_desc": plugin.plugin_desc,
"plugin_version": plugin.plugin_version,
"plugin_author": plugin.plugin_author,
"state": plugin.state,
"has_page": plugin.has_page,
}
)
result_json = json.dumps(plugins_list, ensure_ascii=False, indent=2)
result_json = json.dumps(installed_plugins, ensure_ascii=False, indent=2)
return result_json
except Exception as e:
logger.error(f"查询已安装插件失败: {e}", exc_info=True)

View File

@@ -1,5 +1,6 @@
"""查询媒体库工具"""
import asyncio
import json
from collections import OrderedDict
from typing import Optional, Type, Any
@@ -102,6 +103,16 @@ class QueryLibraryExistsTool(MoviePilotTool):
message += f" [{media_type}]"
return message
@staticmethod
def _get_media_server_names() -> list[str]:
"""同步读取已加载媒体服务器名称。"""
return sorted(MediaServerHelper().get_services().keys())
@staticmethod
def _query_media_exists(mediainfo, server: Optional[str] = None):
"""同步查询单个媒体服务器的存在性信息。"""
return MediaServerChain().media_exists(mediainfo=mediainfo, server=server)
async def run(self, tmdb_id: Optional[int] = None, douban_id: Optional[str] = None,
media_type: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: tmdb_id={tmdb_id}, douban_id={douban_id}, media_type={media_type}")
@@ -116,7 +127,7 @@ class QueryLibraryExistsTool(MoviePilotTool):
return f"错误:无效的媒体类型 '{media_type}',支持的类型:'movie', 'tv'"
media_chain = MediaServerChain()
mediainfo = media_chain.recognize_media(
mediainfo = await media_chain.async_recognize_media(
tmdbid=tmdb_id,
doubanid=douban_id,
mtype=media_type_enum,
@@ -127,12 +138,22 @@ class QueryLibraryExistsTool(MoviePilotTool):
# 2. 遍历所有媒体服务器,分别查询存在性信息
server_results = OrderedDict()
media_server_helper = MediaServerHelper()
total_seasons = _filter_regular_seasons(mediainfo.seasons)
global_existsinfo = media_chain.media_exists(mediainfo=mediainfo)
service_names = self._get_media_server_names()
for service_name in sorted(media_server_helper.get_services().keys()):
existsinfo = media_chain.media_exists(mediainfo=mediainfo, server=service_name)
server_checks = await asyncio.gather(
*[
self.run_blocking(
"mediaserver",
self._query_media_exists,
mediainfo,
service_name,
)
for service_name in service_names
]
)
for service_name, existsinfo in zip(service_names, server_checks):
if not existsinfo:
continue
@@ -147,21 +168,23 @@ class QueryLibraryExistsTool(MoviePilotTool):
"exists": True
}
if global_existsinfo:
fallback_server_name = global_existsinfo.server or "local"
if fallback_server_name not in server_results:
if global_existsinfo.type == MediaType.TV:
server_results[fallback_server_name] = _build_tv_server_result(
existing_seasons=_filter_regular_seasons(global_existsinfo.seasons),
total_seasons=total_seasons
)
else:
server_results[fallback_server_name] = {
"exists": True
}
if not server_results:
return "媒体库中未找到相关媒体"
global_existsinfo = await self.run_blocking(
"mediaserver", self._query_media_exists, mediainfo, None
)
if not global_existsinfo:
return "媒体库中未找到相关媒体"
fallback_server_name = global_existsinfo.server or "local"
if global_existsinfo.type == MediaType.TV:
server_results[fallback_server_name] = _build_tv_server_result(
existing_seasons=_filter_regular_seasons(global_existsinfo.seasons),
total_seasons=total_seasons
)
else:
server_results[fallback_server_name] = {
"exists": True
}
# 3. 组装统一的存在性结果,不查询媒体服务器详情
result_dict = {

View File

@@ -1,5 +1,6 @@
"""查询媒体服务器最近入库影片工具"""
import asyncio
import json
from typing import Optional, Type
@@ -50,6 +51,32 @@ class QueryLibraryLatestTool(MoviePilotTool):
return " | ".join(parts)
@staticmethod
def _get_enabled_servers() -> list[str]:
"""同步读取启用的媒体服务器列表。"""
mediaservers = ServiceConfigHelper.get_mediaserver_configs()
return [ms.name for ms in mediaservers if ms.enabled]
@staticmethod
def _load_latest_items(
server_name: str, count: int, username: Optional[str] = None
) -> list[dict]:
"""
媒体服务器 SDK 和 requests 调用都是同步的,这里在线程池中转换为可序列化结果。
"""
latest_items = MediaServerChain().latest(
server=server_name, count=count, username=username
)
if not latest_items:
return []
return [
{
**item.model_dump(exclude_none=True),
"server": server_name,
}
for item in latest_items
]
async def run(
self, server: Optional[str] = None, page: Optional[int] = 1, **kwargs
) -> str:
@@ -58,37 +85,34 @@ class QueryLibraryLatestTool(MoviePilotTool):
fetch_count = page * PAGE_SIZE
logger.info(f"执行工具: {self.name}, 参数: server={server}, page={page}")
try:
media_chain = MediaServerChain()
results = []
# 如果没有指定服务器,获取所有启用的媒体服务器
if not server:
mediaservers = ServiceConfigHelper.get_mediaserver_configs()
enabled_servers = [ms.name for ms in mediaservers if ms.enabled]
enabled_servers = self._get_enabled_servers()
if not enabled_servers:
return "未找到启用的媒体服务器"
# 遍历所有启用的服务器
for server_name in enabled_servers:
latest_items = media_chain.latest(
server=server_name, count=fetch_count, username=self._username
)
if latest_items:
for item in latest_items:
item_dict = item.model_dump(exclude_none=True)
item_dict["server"] = server_name
results.append(item_dict)
else:
# 查询指定服务器
latest_items = media_chain.latest(
server=server, count=fetch_count, username=self._username
server_results = await asyncio.gather(
*[
self.run_blocking(
"mediaserver",
self._load_latest_items,
server_name,
fetch_count,
self._username,
)
for server_name in enabled_servers
]
)
results = [
item for items in server_results for item in items if items
]
else:
results = await self.run_blocking(
"mediaserver",
self._load_latest_items,
server,
fetch_count,
self._username,
)
if latest_items:
for item in latest_items:
item_dict = item.model_dump(exclude_none=True)
item_dict["server"] = server
results.append(item_dict)
if not results:
server_info = f"服务器 {server}" if server else "所有服务器"

View File

@@ -43,70 +43,68 @@ class QueryPluginCapabilitiesTool(MoviePilotTool):
return f"查询插件 {plugin_id} 的能力"
return "查询所有插件的能力"
async def run(self, plugin_id: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: plugin_id={plugin_id}")
try:
plugin_manager = PluginManager()
result = {}
@staticmethod
def _load_plugin_capabilities(plugin_id: Optional[str] = None) -> dict:
"""读取运行中插件实例暴露的内存能力信息。"""
plugin_manager = PluginManager()
result = {}
# 获取插件命令
commands = plugin_manager.get_plugin_commands(pid=plugin_id)
if commands:
commands_list = []
for cmd in commands:
cmd_info = {
"cmd": cmd.get("cmd"),
"desc": cmd.get("desc"),
"plugin_id": cmd.get("pid"),
}
# data 字段可能包含额外参数信息
if cmd.get("data"):
cmd_info["data"] = cmd.get("data")
commands_list.append(cmd_info)
result["commands"] = commands_list
commands = plugin_manager.get_plugin_commands(pid=plugin_id)
if commands:
result["commands"] = [
{
"cmd": cmd.get("cmd"),
"desc": cmd.get("desc"),
"plugin_id": cmd.get("pid"),
**({"data": cmd.get("data")} if cmd.get("data") else {}),
}
for cmd in commands
]
# 获取插件动作
actions = plugin_manager.get_plugin_actions(pid=plugin_id)
if actions:
actions_list = []
for action_group in actions:
plugin_actions = {
actions = plugin_manager.get_plugin_actions(pid=plugin_id)
if actions:
actions_list = []
for action_group in actions:
actions_list.append(
{
"plugin_id": action_group.get("plugin_id"),
"plugin_name": action_group.get("plugin_name"),
"actions": [],
}
for action in action_group.get("actions", []):
plugin_actions["actions"].append(
"actions": [
{
"id": action.get("id"),
"name": action.get("name"),
}
)
actions_list.append(plugin_actions)
result["actions"] = actions_list
# 获取插件定时服务
services = plugin_manager.get_plugin_services(pid=plugin_id)
if services:
services_list = []
for svc in services:
svc_info = {
"id": svc.get("id"),
"name": svc.get("name"),
for action in action_group.get("actions", [])
],
}
# 包含触发器信息
trigger = svc.get("trigger")
if trigger:
svc_info["trigger"] = str(trigger)
# 包含定时器参数
svc_kwargs = svc.get("kwargs")
if svc_kwargs:
svc_info["trigger_kwargs"] = {
k: str(v) for k, v in svc_kwargs.items()
}
services_list.append(svc_info)
result["services"] = services_list
)
result["actions"] = actions_list
services = plugin_manager.get_plugin_services(pid=plugin_id)
if services:
services_list = []
for svc in services:
svc_info = {
"id": svc.get("id"),
"name": svc.get("name"),
}
trigger = svc.get("trigger")
if trigger:
svc_info["trigger"] = str(trigger)
svc_kwargs = svc.get("kwargs")
if svc_kwargs:
svc_info["trigger_kwargs"] = {
k: str(v) for k, v in svc_kwargs.items()
}
services_list.append(svc_info)
result["services"] = services_list
return result
async def run(self, plugin_id: Optional[str] = None, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: plugin_id={plugin_id}")
try:
result = self._load_plugin_capabilities(plugin_id)
if not result:
if plugin_id:
return f"插件 {plugin_id} 没有注册任何命令、动作或定时服务"

View File

@@ -24,36 +24,35 @@ class QueryRuleGroupsTool(MoviePilotTool):
"""根据查询参数生成友好的提示消息"""
return "查询所有规则组"
@staticmethod
def _load_rule_groups() -> dict:
"""从内存配置缓存中读取规则组。"""
rule_groups = RuleHelper().get_rule_groups()
if not rule_groups:
return {
"message": "未找到任何规则组",
"rule_groups": [],
}
simplified_groups = [
{
"name": group.name,
"media_type": group.media_type,
"category": group.category,
}
for group in rule_groups
]
return {
"message": f"找到 {len(simplified_groups)} 个规则组",
"rule_groups": simplified_groups,
}
async def run(self, **kwargs) -> str:
logger.info(f"执行工具: {self.name}")
try:
rule_helper = RuleHelper()
rule_groups = rule_helper.get_rule_groups()
if not rule_groups:
return json.dumps({
"message": "未找到任何规则组",
"rule_groups": []
}, ensure_ascii=False, indent=2)
# 精简字段,过滤掉 rule_string 避免结果过大
simplified_groups = []
for group in rule_groups:
simplified = {
"name": group.name,
"media_type": group.media_type,
"category": group.category
}
simplified_groups.append(simplified)
result = {
"message": f"找到 {len(simplified_groups)} 个规则组",
"rule_groups": simplified_groups
}
result = self._load_rule_groups()
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
error_message = f"查询规则组失败: {str(e)}"
logger.error(f"查询规则组失败: {e}", exc_info=True)

View File

@@ -33,27 +33,26 @@ class RunSchedulerTool(MoviePilotTool):
job_id = kwargs.get("job_id", "")
return f"运行定时服务 (ID: {job_id})"
@staticmethod
def _run_scheduler_sync(job_id: str) -> tuple[bool, str]:
"""同步触发定时服务,避免调度器扫描阻塞事件循环。"""
scheduler = Scheduler()
for scheduler_item in scheduler.list():
if scheduler_item.id == job_id:
scheduler.start(job_id)
return True, scheduler_item.name
return False, ""
async def run(self, job_id: str, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: job_id={job_id}")
try:
scheduler = Scheduler()
# 检查定时服务是否存在
schedulers = scheduler.list()
job_exists = False
job_name = None
for s in schedulers:
if s.id == job_id:
job_exists = True
job_name = s.name
break
job_exists, job_name = await self.run_blocking(
"workflow", self._run_scheduler_sync, job_id
)
if not job_exists:
return f"定时服务 ID {job_id} 不存在,请使用 query_schedulers 工具查询可用的定时服务"
# 运行定时服务
scheduler.start(job_id)
return f"成功触发定时服务:{job_name} (ID: {job_id})"
except Exception as e:
logger.error(f"运行定时服务失败: {e}", exc_info=True)

View File

@@ -46,6 +46,13 @@ class RunWorkflowTool(MoviePilotTool):
return message
@staticmethod
def _run_workflow_sync(
workflow_id: int, from_begin: Optional[bool] = True
) -> tuple[bool, str]:
"""同步执行工作流,放到专用线程池避免长流程阻塞 API 响应。"""
return WorkflowChain().process(workflow_id, from_begin=from_begin)
async def run(
self, workflow_id: int, from_begin: Optional[bool] = True, **kwargs
) -> str:
@@ -62,10 +69,12 @@ class RunWorkflowTool(MoviePilotTool):
if not workflow:
return f"未找到工作流:{workflow_id},请使用 query_workflows 工具查询可用的工作流"
# 执行工作流
workflow_chain = WorkflowChain()
state, errmsg = workflow_chain.process(
workflow.id, from_begin=from_begin
# 工作流执行链路包含大量同步步骤,统一放到 workflow 线程池。
state, errmsg = await self.run_blocking(
"workflow",
self._run_workflow_sync,
workflow.id,
from_begin,
)
if not state:

View File

@@ -8,7 +8,6 @@ from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.media import MediaChain
from app.core.config import global_vars
from app.core.metainfo import MetaInfoPath
from app.log import logger
from app.schemas import FileItem
@@ -104,15 +103,14 @@ class ScrapeMetadataTool(MoviePilotTool):
ensure_ascii=False,
)
# 在线程池中执行同步的刮削操作
await global_vars.loop.run_in_executor(
None,
lambda: media_chain.scrape_metadata(
fileitem=fileitem,
meta=meta,
mediainfo=mediainfo,
overwrite=overwrite,
),
# 刮削会包含磁盘写入和外部图片/元数据访问,统一放到 storage 线程池。
await self.run_blocking(
"storage",
media_chain.scrape_metadata,
fileitem=fileitem,
meta=meta,
mediainfo=mediainfo,
overwrite=overwrite,
)
return json.dumps(

View File

@@ -7,7 +7,6 @@ from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.chain.subscribe import SubscribeChain
from app.core.config import global_vars
from app.db.subscribe_oper import SubscribeOper
from app.log import logger
from app.schemas.types import media_type_to_agent
@@ -81,19 +80,13 @@ class SearchSubscribeTool(MoviePilotTool):
subscribe_oper.update(subscribe_id, {"filter_groups": filter_groups})
logger.info(f"更新订阅 #{subscribe_id} 的规则组为: {filter_groups}")
# 调用 SubscribeChain 的 search 方法
# search 方法是同步的,需要在异步环境中运行
subscribe_chain = SubscribeChain()
# 在线程池中执行同步的搜索操作
# 当 sid 有值时state 参数会被忽略,直接处理该订阅
await global_vars.loop.run_in_executor(
None,
lambda: subscribe_chain.search(
sid=subscribe_id,
state='R', # 默认状态,当 sid 有值时此参数会被忽略
manual=manual
)
# 订阅搜索会触发大量同步站点访问,统一走 subscribe 线程池。
await self.run_blocking(
"subscribe",
SubscribeChain().search,
sid=subscribe_id,
state="R", # 当 sid 有值时此参数会被忽略
manual=manual,
)
# 重新获取订阅信息以获取更新后的状态

View File

@@ -50,6 +50,11 @@ class SearchTorrentsTool(MoviePilotTool):
message += f" [{media_type}]"
return message
@staticmethod
def _load_configured_sites() -> List[int]:
"""同步读取默认搜索站点列表。"""
return SystemConfigOper().get(SystemConfigKey.IndexerSites) or []
async def run(self, tmdb_id: Optional[int] = None, douban_id: Optional[str] = None,
media_type: Optional[str] = None, area: Optional[str] = None,
sites: Optional[List[int]] = None, **kwargs) -> str:
@@ -83,8 +88,7 @@ class SearchTorrentsTool(MoviePilotTool):
if sites:
search_site_ids = sites
else:
configured_sites = SystemConfigOper().get(SystemConfigKey.IndexerSites)
search_site_ids = configured_sites if configured_sites else []
search_site_ids = self._load_configured_sites()
if filtered_torrents:
await search_chain.async_save_cache(filtered_torrents, SEARCH_RESULT_CACHE_FILE)

View File

@@ -26,24 +26,29 @@ class TestSiteTool(MoviePilotTool):
site_identifier = kwargs.get("site_identifier")
return f"测试站点连通性: {site_identifier}"
@staticmethod
def _test_site_sync(site_identifier: int) -> tuple[Optional[str], Optional[str], bool, str]:
"""在同步线程里执行站点联通测试,避免网络请求卡住事件循环。"""
site = SiteOper().get(site_identifier)
if not site:
return None, None, False, f"未找到站点:{site_identifier},请使用 query_sites 工具查询可用的站点"
status, message = SiteChain().test(site.domain)
return site.name, site.domain, status, message
async def run(self, site_identifier: int, **kwargs) -> str:
logger.info(f"执行工具: {self.name}, 参数: site_identifier={site_identifier}")
try:
site_oper = SiteOper()
site_chain = SiteChain()
site = await site_oper.async_get(site_identifier)
if not site:
return f"未找到站点:{site_identifier},请使用 query_sites 工具查询可用的站点"
# 测试站点连通性
status, message = site_chain.test(site.domain)
site_name, site_domain, status, message = await self.run_blocking(
"site", self._test_site_sync, site_identifier
)
if not site_name:
return message
if status:
return f"站点连通性测试成功:{site.name} ({site.domain})\n{message}"
return f"站点连通性测试成功:{site_name} ({site_domain})\n{message}"
else:
return f"站点连通性测试失败:{site.name} ({site.domain})\n{message}"
return f"站点连通性测试失败:{site_name} ({site_domain})\n{message}"
except Exception as e:
logger.error(f"测试站点连通性失败: {e}", exc_info=True)
return f"测试站点连通性时发生错误: {str(e)}"

View File

@@ -84,6 +84,73 @@ class TransferFileTool(MoviePilotTool):
return message
@staticmethod
def _transfer_file_sync(
file_path: str,
storage: Optional[str] = "local",
target_path: Optional[str] = None,
target_storage: Optional[str] = None,
media_type: Optional[str] = None,
tmdbid: Optional[int] = None,
doubanid: Optional[str] = None,
season: Optional[int] = None,
transfer_type: Optional[str] = None,
background: Optional[bool] = False,
) -> str:
"""
文件整理链路包含大量同步磁盘与外部服务调用,需要在线程池中运行。
"""
if not file_path:
return "错误:必须提供文件或目录路径"
if storage == "local":
if not file_path.startswith("/") and not (
len(file_path) > 1 and file_path[1] == ":"
):
file_path = str(Path(file_path).resolve())
elif not file_path.startswith("/"):
file_path = "/" + file_path
fileitem = FileItem(
storage=storage or "local",
path=file_path,
type="dir" if file_path.endswith("/") else "file",
)
target_path_obj = Path(target_path) if target_path else None
media_type_enum = None
if media_type:
media_type_enum = MediaType.from_agent(media_type)
if not media_type_enum:
return f"错误:无效的媒体类型 '{media_type}',支持的类型:'movie', 'tv'"
state, errormsg = TransferChain().manual_transfer(
fileitem=fileitem,
target_storage=target_storage,
target_path=target_path_obj,
tmdbid=tmdbid,
doubanid=doubanid,
mtype=media_type_enum,
season=season,
transfer_type=transfer_type,
background=background,
)
if state:
if background:
return f"整理任务已提交到后台运行:{file_path}"
return f"整理成功:{file_path}"
if isinstance(errormsg, list):
error_text = f"整理完成,{len(errormsg)} 个文件转移失败"
if errormsg:
error_text += "\n" + "\n".join(str(e) for e in errormsg[:5])
if len(errormsg) > 5:
error_text += f"\n... 还有 {len(errormsg) - 5} 个错误"
else:
error_text = str(errormsg)
return f"整理失败:{error_text}"
async def run(
self,
file_path: str,
@@ -105,73 +172,20 @@ class TransferFileTool(MoviePilotTool):
)
try:
if not file_path:
return "错误:必须提供文件或目录路径"
# 规范化路径
if storage == "local":
# 本地路径处理
if not file_path.startswith("/") and not (
len(file_path) > 1 and file_path[1] == ":"
):
# 相对路径,尝试转换为绝对路径
file_path = str(Path(file_path).resolve())
else:
# 远程存储路径,确保以/开头
if not file_path.startswith("/"):
file_path = "/" + file_path
# 创建FileItem
fileitem = FileItem(
storage=storage or "local",
path=file_path,
type="dir" if file_path.endswith("/") else "file",
return await self.run_blocking(
"storage",
self._transfer_file_sync,
file_path,
storage,
target_path,
target_storage,
media_type,
tmdbid,
doubanid,
season,
transfer_type,
background,
)
# 处理目标路径
target_path_obj = None
if target_path:
target_path_obj = Path(target_path)
# 处理媒体类型
media_type_enum = None
if media_type:
media_type_enum = MediaType.from_agent(media_type)
if not media_type_enum:
return f"错误:无效的媒体类型 '{media_type}',支持的类型:'movie', 'tv'"
# 调用整理方法
transfer_chain = TransferChain()
state, errormsg = transfer_chain.manual_transfer(
fileitem=fileitem,
target_storage=target_storage,
target_path=target_path_obj,
tmdbid=tmdbid,
doubanid=doubanid,
mtype=media_type_enum,
season=season,
transfer_type=transfer_type,
background=background,
)
if not state:
# 处理错误信息
if isinstance(errormsg, list):
error_text = f"整理完成,{len(errormsg)} 个文件转移失败"
if errormsg:
error_text += f"\n" + "\n".join(
str(e) for e in errormsg[:5]
) # 只显示前5个错误
if len(errormsg) > 5:
error_text += f"\n... 还有 {len(errormsg) - 5} 个错误"
else:
error_text = str(errormsg)
return f"整理失败:{error_text}"
else:
if background:
return f"整理任务已提交到后台运行:{file_path}"
else:
return f"整理成功:{file_path}"
except Exception as e:
logger.error(f"整理文件失败: {e}", exc_info=True)
return f"整理文件时发生错误: {str(e)}"

View File

@@ -47,6 +47,28 @@ class UpdateSiteCookieTool(MoviePilotTool):
return message
@staticmethod
def _update_site_cookie_sync(
site_identifier: int,
username: str,
password: str,
two_step_code: Optional[str] = None,
) -> tuple[Optional[str], bool, str]:
"""
在同步线程里执行站点登录和 Cookie 更新,避免网络登录阻塞协程。
"""
site = SiteOper().get(site_identifier)
if not site:
return None, False, f"未找到站点:{site_identifier},请使用 query_sites 工具查询可用的站点"
status, message = SiteChain().update_cookie(
site_info=site,
username=username,
password=password,
two_step_code=two_step_code,
)
return site.name, status, message
async def run(
self,
site_identifier: int,
@@ -60,25 +82,21 @@ class UpdateSiteCookieTool(MoviePilotTool):
)
try:
site_oper = SiteOper()
site_chain = SiteChain()
site = await site_oper.async_get(site_identifier)
if not site:
return f"未找到站点:{site_identifier},请使用 query_sites 工具查询可用的站点"
# 更新站点Cookie和UA
status, message = site_chain.update_cookie(
site_info=site,
username=username,
password=password,
two_step_code=two_step_code,
site_name, status, message = await self.run_blocking(
"site",
self._update_site_cookie_sync,
site_identifier,
username,
password,
two_step_code,
)
if not site_name:
return message
if status:
return f"站点【{site.name}】Cookie和UA更新成功\n{message}"
return f"站点【{site_name}】Cookie和UA更新成功\n{message}"
else:
return f"站点【{site.name}】Cookie和UA更新失败\n错误原因:{message}"
return f"站点【{site_name}】Cookie和UA更新失败\n错误原因:{message}"
except Exception as e:
logger.error(f"更新站点Cookie和UA失败: {e}", exc_info=True)
return f"更新站点Cookie和UA时发生错误: {str(e)}"

View File

@@ -333,19 +333,20 @@ class ChainBase(metaclass=ABCMeta):
if inspect.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
# 系统同步模块在异步路径里也必须切到线程池,避免阻塞共享事件循环。
result = await run_in_threadpool(func, *args, **kwargs)
elif ObjectUtils.check_signature(func, result):
# 返回结果与方法签名一致,将结果传入
if inspect.iscoroutinefunction(func):
result = await func(result)
else:
result = func(result)
result = await run_in_threadpool(func, result)
elif isinstance(result, list):
# 返回为列表,有多个模块运行结果时进行合并
if inspect.iscoroutinefunction(func):
temp = await func(*args, **kwargs)
else:
temp = func(*args, **kwargs)
temp = await run_in_threadpool(func, *args, **kwargs)
if isinstance(temp, list):
result.extend(temp)
else:

View File

@@ -950,9 +950,13 @@ class DownloadChain(ChainBase):
torrents = self.list_torrents(downloader=name, status=TorrentStatus.DOWNLOADING)
if not torrents:
return []
history_map = DownloadHistoryOper().get_by_hashes(
[torrent.hash for torrent in torrents if torrent.hash]
)
ret_torrents = []
for torrent in torrents:
history = DownloadHistoryOper().get_by_hash(torrent.hash)
history = history_map.get(torrent.hash)
if history:
# 媒体信息
torrent.media = {

View File

@@ -1,4 +1,4 @@
from typing import List, Optional
from typing import Dict, List, Optional
from app.db import DbOper
from app.db.models.downloadhistory import DownloadHistory, DownloadFiles
@@ -23,6 +23,17 @@ class DownloadHistoryOper(DbOper):
"""
return DownloadHistory.get_by_hash(self._db, download_hash)
def get_by_hashes(self, download_hashes: List[str]) -> Dict[str, DownloadHistory]:
"""
批量按 Hash 查询下载记录,并返回以 Hash 为键的映射。
"""
histories = DownloadHistory.get_by_hashes(self._db, download_hashes)
return {
history.download_hash: history
for history in histories
if history and history.download_hash
}
def get_by_mediaid(self, tmdbid: int, doubanid: str) -> List[DownloadHistory]:
"""
按媒体ID查询下载记录

View File

@@ -1,5 +1,5 @@
import time
from typing import Optional
from typing import List, Optional
from sqlalchemy import Column, Integer, String, JSON, select
from sqlalchemy.ext.asyncio import AsyncSession
@@ -69,6 +69,40 @@ class DownloadHistory(Base):
.first()
)
@classmethod
@db_query
def get_by_hashes(cls, db: Session, download_hashes: List[str]):
"""
批量查询多个下载任务的最新历史记录,避免在上层形成 N+1 查询。
"""
normalized_hashes = []
seen_hashes = set()
for download_hash in download_hashes or []:
if not download_hash or download_hash in seen_hashes:
continue
seen_hashes.add(download_hash)
normalized_hashes.append(download_hash)
if not normalized_hashes:
return []
histories = (
db.query(DownloadHistory)
.filter(DownloadHistory.download_hash.in_(normalized_hashes))
.order_by(DownloadHistory.download_hash, DownloadHistory.date.desc())
.all()
)
latest_histories = {}
for history in histories:
if history.download_hash and history.download_hash not in latest_histories:
latest_histories[history.download_hash] = history
return [
latest_histories[download_hash]
for download_hash in normalized_hashes
if download_hash in latest_histories
]
@classmethod
@db_query
def get_by_mediaid(cls, db: Session, tmdbid: int, doubanid: str):

View File

@@ -148,6 +148,12 @@ class SubscribeOper(DbOper):
"""
Subscribe.delete(self._db, rid=sid)
async def async_delete(self, sid: int):
"""
异步删除订阅。
"""
await Subscribe.async_delete(self._db, rid=sid)
def update(self, sid: int, payload: dict) -> Subscribe:
"""
更新订阅

View File

@@ -20,6 +20,12 @@ class TransferHistoryOper(DbOper):
"""
return TransferHistory.get(self._db, historyid)
async def async_get(self, historyid: int) -> TransferHistory:
"""
异步获取转移历史。
"""
return await TransferHistory.async_get(self._db, historyid)
def get_by_title(self, title: str) -> List[TransferHistory]:
"""
按标题查询转移记录
@@ -93,6 +99,12 @@ class TransferHistoryOper(DbOper):
"""
TransferHistory.delete(self._db, historyid)
async def async_delete(self, historyid):
"""
异步删除转移记录。
"""
await TransferHistory.async_delete(self._db, historyid)
def truncate(self):
"""
清空转移记录

View File

@@ -108,6 +108,12 @@ class UserOper(DbOper):
"""
return User.get_by_name(self._db, name)
async def async_get_by_name(self, name: str) -> User:
"""
异步根据用户名获取用户。
"""
return await User.async_get_by_name(self._db, name)
def get_permissions(self, name: str) -> dict:
"""
获取用户权限