mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-06-01 13:40:54 +08:00
fix: reduce low-risk pylint issues
This commit is contained in:
@@ -80,4 +80,6 @@ ignore-imports=yes
|
||||
|
||||
[TYPECHECK]
|
||||
# 生成缺失成员提示的类列表
|
||||
generated-members=requests.packages.urllib3
|
||||
generated-members=requests.packages.urllib3
|
||||
# app.helper.sites 会主动隐藏模块属性枚举,避免误报 no-name-in-module
|
||||
ignored-modules=app.helper.sites
|
||||
|
||||
@@ -157,7 +157,7 @@ def _parse_skill_metadata( # noqa: C901
|
||||
MAX_SKILL_COMPATIBILITY_LENGTH,
|
||||
skill_path,
|
||||
)
|
||||
compatibility_str = compatibility_str[:MAX_SKILL_COMPATIBILITY_LENGTH]
|
||||
compatibility_str = str(compatibility_str)[:MAX_SKILL_COMPATIBILITY_LENGTH]
|
||||
|
||||
# 版本号,默认为 0(表示未设置版本)
|
||||
raw_version = frontmatter_data.get("version")
|
||||
|
||||
@@ -236,7 +236,8 @@ class MoviePilotTool(BaseTool, metaclass=ABCMeta):
|
||||
Returns:
|
||||
str: 友好的提示消息,如果返回 None 或空字符串则使用 explanation
|
||||
"""
|
||||
return None
|
||||
explanation = kwargs.get("explanation")
|
||||
return str(explanation) if explanation else None
|
||||
|
||||
@abstractmethod
|
||||
async def run(self, **kwargs) -> str:
|
||||
|
||||
@@ -26,9 +26,11 @@ class UserChoiceOptionInput(BaseModel):
|
||||
|
||||
@model_validator(mode="after")
|
||||
def validate_option(self):
|
||||
if not self.label.strip():
|
||||
label = str(self.label)
|
||||
value = str(self.value)
|
||||
if not label.strip():
|
||||
raise ValueError("label 不能为空")
|
||||
if not self.value.strip():
|
||||
if not value.strip():
|
||||
raise ValueError("value 不能为空")
|
||||
return self
|
||||
|
||||
@@ -55,7 +57,8 @@ class AskUserChoiceInput(BaseModel):
|
||||
|
||||
@model_validator(mode="after")
|
||||
def validate_payload(self):
|
||||
if not self.message.strip():
|
||||
message = str(self.message)
|
||||
if not message.strip():
|
||||
raise ValueError("message 不能为空")
|
||||
if not self.options:
|
||||
raise ValueError("options 至少需要提供一个")
|
||||
|
||||
@@ -6,7 +6,7 @@ import signal
|
||||
import subprocess
|
||||
from dataclasses import dataclass, field
|
||||
from tempfile import NamedTemporaryFile
|
||||
from typing import Optional, TextIO, Type
|
||||
from typing import Any, Optional, TextIO, Type
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
@@ -188,7 +188,7 @@ class ExecuteCommandTool(MoviePilotTool):
|
||||
output.append(stream_name, chunk.decode("utf-8", errors="replace"))
|
||||
|
||||
@staticmethod
|
||||
def _terminate_process(process: asyncio.subprocess.Process, sig: int):
|
||||
def _terminate_process(process: Any, sig: int):
|
||||
"""向进程组发送终止信号;不支持进程组的平台回退为单进程终止。"""
|
||||
try:
|
||||
if os.name == "posix":
|
||||
@@ -203,7 +203,7 @@ class ExecuteCommandTool(MoviePilotTool):
|
||||
@classmethod
|
||||
async def _cleanup_process(
|
||||
cls,
|
||||
process: asyncio.subprocess.Process,
|
||||
process: Any,
|
||||
wait_task: asyncio.Task,
|
||||
) -> None:
|
||||
"""先温和终止,失败后强杀,避免超时 shell 遗留子进程。"""
|
||||
|
||||
@@ -159,7 +159,7 @@ class ChainBase(metaclass=ABCMeta):
|
||||
处理插件模块执行错误
|
||||
"""
|
||||
if kwargs.get("raise_exception"):
|
||||
raise
|
||||
raise err
|
||||
logger.error(
|
||||
f"运行插件 {plugin_id} 模块 {method} 出错:{str(err)}\n{traceback.format_exc()}"
|
||||
)
|
||||
@@ -185,7 +185,7 @@ class ChainBase(metaclass=ABCMeta):
|
||||
处理系统模块执行错误
|
||||
"""
|
||||
if kwargs.get("raise_exception"):
|
||||
raise
|
||||
raise err
|
||||
logger.error(
|
||||
f"运行模块 {module_id}.{method} 出错:{str(err)}\n{traceback.format_exc()}"
|
||||
)
|
||||
|
||||
@@ -788,7 +788,7 @@ class SkillsChain(ChainBase):
|
||||
if skill.source_type == "registry":
|
||||
text_lines.append("社区源,安装前请自行甄别安全性")
|
||||
|
||||
if any(skill.source_type == "registry" for skill in page_items):
|
||||
if any(skill.source_type == "registry" for skill in items):
|
||||
text_lines.extend(
|
||||
[
|
||||
"",
|
||||
|
||||
@@ -998,7 +998,8 @@ def config_list(show_secrets: bool) -> None:
|
||||
@click.argument("key")
|
||||
def config_get(key: str) -> None:
|
||||
"""读取单个配置项"""
|
||||
if key not in Settings.model_fields and not hasattr(settings, key):
|
||||
setting_fields = Settings.model_fields.keys()
|
||||
if key not in setting_fields and not hasattr(settings, key):
|
||||
raise click.ClickException(f"配置项不存在:{key}")
|
||||
click.echo(_format_value(getattr(settings, key)))
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from datetime import datetime
|
||||
from builtins import list as builtin_list
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import Column, Integer, JSON, String, Index, and_, or_, select
|
||||
@@ -34,9 +35,9 @@ class Workflow(Base):
|
||||
# 已执行次数
|
||||
run_count = Column(Integer, default=0)
|
||||
# 任务列表
|
||||
actions = Column(JSON, default=list)
|
||||
actions = Column(JSON, default=builtin_list)
|
||||
# 任务流
|
||||
flows = Column(JSON, default=list)
|
||||
flows = Column(JSON, default=builtin_list)
|
||||
# 执行上下文
|
||||
context = Column(JSON, default=dict)
|
||||
# 创建时间
|
||||
|
||||
@@ -94,7 +94,8 @@ class SkillHelper(metaclass=WeakSingleton):
|
||||
"""
|
||||
返回系统默认的技能市场列表,用于区分内置源和用户追加源。
|
||||
"""
|
||||
default_value = type(settings).model_fields["SKILL_MARKET"].default
|
||||
skill_market_field = type(settings).model_fields.get("SKILL_MARKET")
|
||||
default_value = skill_market_field.default if skill_market_field else None
|
||||
if not default_value:
|
||||
return []
|
||||
return [item.strip() for item in str(default_value).split(",") if item.strip()]
|
||||
|
||||
@@ -583,6 +583,7 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
|
||||
"""
|
||||
轮询监控(改进版)
|
||||
"""
|
||||
monitor_scope = ",".join(str(mon_path) for mon_path in mon_paths) or "未配置路径"
|
||||
with snapshot_lock:
|
||||
try:
|
||||
# 加载上次快照数据
|
||||
@@ -650,10 +651,10 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
|
||||
trigger='interval',
|
||||
minutes=new_interval
|
||||
)
|
||||
logger.info(f"{storage}:{mon_path} 监控间隔已调整为 {new_interval} 分钟")
|
||||
logger.info(f"{storage}:{monitor_scope} 监控间隔已调整为 {new_interval} 分钟")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"轮询监控 {storage}:{mon_path} 出现错误:{e}")
|
||||
logger.error(f"轮询监控 {storage}:{monitor_scope} 出现错误:{e}")
|
||||
logger.debug(traceback.format_exc())
|
||||
|
||||
def event_handler(self, event, text: str, event_path: str, file_size: float = None):
|
||||
|
||||
Reference in New Issue
Block a user