feedback-issue: 拆三步、入口意图门、消息可靠性、日志脱敏与噪音过滤 (#5810)

This commit is contained in:
InfinityPacer
2026-05-21 13:57:12 +08:00
committed by GitHub
parent 4c64b1769d
commit 0245c8db80
13 changed files with 2917 additions and 150 deletions

View File

@@ -89,6 +89,28 @@ class AskUserChoiceTool(MoviePilotTool):
return text[:max_length]
return text[: max_length - 3] + "..."
def _blocked_by_feedback_quality_gate(self) -> bool:
"""反馈 Issue 质量门槛拒绝后,禁止继续发按钮引导改写。
这是对 ``feedback-issue`` skill 的工具层兜底:模型可能在
``submit_feedback_issue`` 返回 ``rejected_quality`` 后仍调用本工具,
试图让用户选择“提供真实问题描述重新提交”。这会把测试 / 占位内容
的拒绝结果变成绕过指导,因此同一轮 tool context 中直接拦截。
"""
return bool(self._agent_context.get("feedback_issue_rejected_quality"))
def _blocked_by_pending_feedback_confirmation(self) -> bool:
"""已经发出 ``prepare_feedback_issue`` 的预览按钮后,禁止再叠一层选择。
Why: Issue #5807 实测中 deepseek 在 prepare 之后又自作主张调
``ask_user_choice``,给用户发了第二个「确认提交 ISSUE」按钮。
两条按钮 → 两次 callback → agent 走两轮 → 同一条成功文案被发 3 次。
从工具层硬拦:发现 ``reply_mode=feedback_issue_confirmation`` 直接拒绝。
"""
return (
self._agent_context.get("reply_mode") == "feedback_issue_confirmation"
)
async def run(
self,
message: str,
@@ -96,6 +118,29 @@ class AskUserChoiceTool(MoviePilotTool):
title: Optional[str] = None,
**kwargs,
) -> str:
if self._blocked_by_feedback_quality_gate():
logger.warning(
"ask_user_choice blocked after feedback issue rejected_quality: "
"session_id=%s",
self._session_id,
)
return (
"反馈 Issue 已被质量门槛拒绝,不能继续发送按钮引导用户改写或重新提交。"
"请直接结束本次反馈流程。"
)
if self._blocked_by_pending_feedback_confirmation():
logger.warning(
"ask_user_choice blocked while feedback issue preview pending: "
"session_id=%s",
self._session_id,
)
return (
"prepare_feedback_issue 已经发出确认按钮并在等待用户点击,"
"不允许再叠加 ask_user_choice。请直接结束本轮等待用户在"
"现有按钮上点选。"
)
if not self._channel or not self._source:
return "当前不在可回传消息的会话中,无法发起按钮选择"

View File

@@ -0,0 +1,453 @@
"""收集反馈 Issue 提交前需要附带的本地诊断日志。"""
from __future__ import annotations
import json
import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.agent.tools.impl.feedback_issue_state import feedback_issue_state_store
from app.agent.tools.impl.submit_feedback_issue import SubmitFeedbackIssueTool
from app.core.config import settings
from app.log import logger
_MAX_READ_BYTES = 512 * 1024
_MAX_DIAGNOSTIC_LOG_CHARS = 6 * 1024
# 默认时间窗:仅收集最近 30 分钟的日志。
# Why: 用户说「今天 TMDB 一直在报错」时,期望看到的是这次会话前后真实
# 触发的报错,而不是几天前历史日志里所有出现 "TMDB" 的行。Issue #5806
# 实战中就发生了:关键词命中了几天前的测试日志,日志段完全对不上当前问题。
_DEFAULT_TIME_WINDOW_MINUTES = 30
_MIN_TIME_WINDOW_MINUTES = 5
_MAX_TIME_WINDOW_MINUTES = 24 * 60
# MoviePilot 主日志行首格式:``【LEVEL】YYYY-MM-DD HH:MM:SS,ms - module - msg``
# 用第一个时间戳判断行属于哪一刻;匹配不到时把行算到「无法判断时间」桶,
# 默认保留(行内可能是 Traceback 续行,不能丢)。
_LOG_TIMESTAMP_RE = re.compile(r"(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})")
_LOG_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
# 提取日志行的源模块名,用于过滤"Agent 自身 meta-noise"。
_LOG_MODULE_RE = re.compile(
r"^【[^】]+】\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d+\s+-\s+([^\s][^\-]*?)\s+-\s+"
)
# 这些模块产出的日志属于 Agent 自身运行 / 框架内务,对用户面故障定位毫无
# 价值——反而经常把诊断段污染成"反馈流程的回声"tool args dump 里塞着
# ``database / 推荐 / 豆包`` 等关键字,让 keyword 过滤命中一堆 noise
# 真正的 RateLimitError / Traceback 反而被挤掉(参见 #5808 实战)。
#
# 包含两类:
# 1) 反馈流程自己的工具与框架(绝对要排除,否则永远在自我反射)
# 2) 通用 Agent 框架噪音tool dispatch / event bus / streaming callback /
# 通知发送 / activity log 等
_META_NOISE_MODULES = frozenset({
# 反馈流程
"collect_feedback_diagnostics.py",
"prepare_feedback_issue.py",
"submit_feedback_issue.py",
"ask_user_choice.py",
# Agent 框架
"base.py", # tool framework: Executing tool / Tool ... executed
"agent", # agent runtime: Agent推理 / 流式输出
"factory.py", # tool factory creation
"callback", # streaming callback
"prompt", # 提示词加载
"memory.py", # 会话记忆
"activity_log.py", # activity 日志
# 消息/事件总线(往往把 issue 预览全文 dump 进日志)
"message.py",
"event.py",
"chain", # chain - 请求系统模块执行xxx
# 渠道适配层噪音
"discord",
"telegram",
"telegram.py",
# 命令执行agent 自己跑过的 shell 命令 echo
"execute_command.py",
})
# 不允许使用的模糊关键词:通用到几乎每条 log 都会命中、对定位本次问题
# 没有价值。当 keyword 列表只剩这些时退回到「按时间窗口取尾部」。
_VAGUE_KEYWORDS = frozenset({
"错误", "异常", "失败", "error", "exception", "failed", "warn", "warning",
"日志", "问题", "bug", "log", "logs",
})
# 入口意图门:``original_user_request`` 里必须能同时命中"动作"+"目标"
# 工具才允许进入反馈流程。Agent 在用户随口提到「报错」「不工作」时自作
# 主张调用本工具,就会被这里硬挡住——把反馈通道留给真正想给上游提
# Issue 的请求。
#
# 当前威胁模型是「模型过度归因到 upstream bug」不是「对抗性绕过」
# 用户用近义词意图明显时(如「能不能给上游提 issue」SKILL.md 引导
# Agent 在原话里至少保留 ``反馈/提交/上游/issue`` 之一;如果保留不下来,
# Agent 应该回退到本地诊断而不是强行触发反馈。
#
# 第一组动作词(必须出现至少一个):
_FEEDBACK_VERB_PHRASES: tuple[str, ...] = (
"反馈", "提交", "上报", "汇报",
"提 issue", "提issue", "提 bug", "提bug",
"报 bug", "报bug", "报告 bug", "报告bug",
"新建 issue", "新建issue", "开 issue", "开issue",
"让上游", "给上游",
"file an issue", "report a bug", "open an upstream issue",
"submit an issue", "raise an issue", "report this upstream",
"report upstream",
)
# 第二组目标词(动作命中后再校验目标存在):英文 phrase 自带目标可绕过这里。
_FEEDBACK_TARGET_TOKENS: tuple[str, ...] = (
"issue", "bug", "问题", "错误报告",
"上游", "mp", "moviepilot",
)
# 自带目标语义的完整短语:命中后直接放行,不再校验目标词。
_FEEDBACK_STANDALONE_PHRASES: tuple[str, ...] = (
"file an issue", "report a bug", "open an upstream issue",
"submit an issue", "raise an issue", "report this upstream",
"report upstream",
"新建 issue", "新建issue", "开 issue", "开issue",
"提 issue", "提issue", "提 bug", "提bug",
"报 bug", "报bug", "报告 bug", "报告bug",
"让上游", "给上游",
)
# 中文里常见"动词 + 量词/介词 + 目标"模式,用正则承接(最多容忍 6 字符
# 间隔,覆盖"给 MP 提个 bug"、"反馈这个问题"、"报告一个 issue"
_FEEDBACK_REGEX_PATTERNS: tuple[re.Pattern, ...] = (
re.compile(r"提.{0,6}(bug|issue|问题|错误报告)", re.IGNORECASE),
re.compile(r"报.{0,6}(bug|issue|错误报告)", re.IGNORECASE),
re.compile(r"反馈.{0,8}(issue|bug|问题|上游|错误)", re.IGNORECASE),
re.compile(r"开.{0,4}(issue|bug)", re.IGNORECASE),
re.compile(r"上报.{0,6}(bug|issue|问题|错误)", re.IGNORECASE),
)
class CollectFeedbackDiagnosticsInput(BaseModel):
"""反馈诊断日志收集工具输入。"""
explanation: str = Field(
...,
description="Clear explanation of why diagnostic logs are being collected before filing feedback",
)
original_user_request: str = Field(
...,
description="The user's original bug report text that triggered diagnostics collection",
)
keywords: Optional[list[str]] = Field(
default=None,
description=(
"Short keywords to filter logs. Should be SPECIFIC tokens: media title, "
"plugin id, exception class name, downloader name, etc. Vague terms like "
"'错误'/'异常'/'失败'/'error' are ignored because they match almost every log line."
),
)
max_lines: int = Field(
default=80,
description="Maximum matched log lines to return; default 80",
)
time_window_minutes: int = Field(
default=_DEFAULT_TIME_WINDOW_MINUTES,
description=(
"Only include log lines whose timestamp falls within the last N minutes "
"(default 30, range 5-1440). Older lines are dropped regardless of keyword "
"match so the diagnostic snapshot reflects the current incident, not "
"historical noise."
),
)
class CollectFeedbackDiagnosticsTool(MoviePilotTool):
"""收集并缓存反馈 Issue 用的日志片段。"""
name: str = "collect_feedback_diagnostics"
description: str = (
"Collect recent local MoviePilot logs before preparing or submitting a feedback issue. "
"This tool reads config/logs/moviepilot.log and plugin logs, filters by user-provided "
"keywords when available, redacts common secrets, and stores a diagnostics_id that "
"submit_feedback_issue requires. Use it before prepare_feedback_issue."
)
args_schema: Type[BaseModel] = CollectFeedbackDiagnosticsInput
require_admin: bool = True
def get_tool_message(self, **kwargs) -> Optional[str]:
"""侧边消息:告知用户正在读取本地日志辅助反馈。"""
return "收集反馈诊断日志"
@staticmethod
def _read_tail(path: Path) -> str:
"""读取日志文件尾部,避免大日志一次性进入内存。"""
try:
size = path.stat().st_size
with path.open("rb") as file_obj:
if size > _MAX_READ_BYTES:
file_obj.seek(size - _MAX_READ_BYTES)
return file_obj.read().decode("utf-8", errors="replace")
except OSError as err:
logger.debug("读取反馈诊断日志失败: %s %s", path, err)
return ""
@staticmethod
def _candidate_log_files() -> list[Path]:
"""返回反馈诊断可读取的日志文件列表。"""
files = [settings.LOG_PATH / "moviepilot.log"]
plugin_log_dir = settings.LOG_PATH / "plugins"
if plugin_log_dir.exists():
files.extend(sorted(plugin_log_dir.rglob("*.log")))
return [path for path in files if path.exists() and path.is_file()]
@staticmethod
def _normalize_keywords(
original_user_request: str,
keywords: Optional[list[str]],
) -> list[str]:
"""合并用户原话和显式关键词,生成保守的日志过滤词。
Issue #5806 教训:把 "错误 / 异常 / 失败 / TMDB" 这种通用词当关键词
会让几乎所有日志行命中,过滤等于没过滤。这里只保留**显式且足够具体**
≥2 字符且不在 ``_VAGUE_KEYWORDS`` 里)的关键词。"""
normalized: list[str] = []
for item in keywords or []:
item = str(item or "").strip()
if len(item) < 2:
continue
if item.lower() in _VAGUE_KEYWORDS:
continue
if item not in normalized:
normalized.append(item)
return normalized
@staticmethod
def _has_explicit_feedback_intent(original_user_request: str) -> bool:
"""判断用户原话里是否出现了"明确要求提 Issue"的意图。
Why: Agent 在 deepseek 这类强模型里会主动归因——用户只说"TMDB 报
""下载没动"Agent 就跳过本地诊断、直接进入反馈流程。本工具
是反馈流程的入口,硬挡一道意图门,迫使 Agent 回到 SKILL.md Step 0
要求的"先排查、再反馈"路径。
判定规则(先放行更具体的、再回落到组合):
1. 命中 ``_FEEDBACK_STANDALONE_PHRASES`` 任一短语 → 放行。
这些短语已经把"动作 + 目标"打包在一起(如 ``提 issue``、
``file an issue``),无需再二次校验。
2. 同时命中一个 ``_FEEDBACK_VERB_PHRASES`` 动作词和一个
``_FEEDBACK_TARGET_TOKENS`` 目标词 → 放行。能覆盖"反馈这个
问题""提交个 bug""把这个反馈给上游"等自然中文。
3. 否则视为没有明确意图,拒绝。
"""
if not original_user_request:
return False
normalized = original_user_request.lower().strip()
if any(phrase in normalized for phrase in _FEEDBACK_STANDALONE_PHRASES):
return True
if any(p.search(normalized) for p in _FEEDBACK_REGEX_PATTERNS):
return True
has_verb = any(phrase in normalized for phrase in _FEEDBACK_VERB_PHRASES)
has_target = any(token in normalized for token in _FEEDBACK_TARGET_TOKENS)
return has_verb and has_target
@staticmethod
def _normalize_window(time_window_minutes: int) -> int:
"""把传入的时间窗 clamp 到 [5, 1440] 区间。"""
try:
window = int(time_window_minutes or _DEFAULT_TIME_WINDOW_MINUTES)
except (TypeError, ValueError):
window = _DEFAULT_TIME_WINDOW_MINUTES
return max(_MIN_TIME_WINDOW_MINUTES, min(_MAX_TIME_WINDOW_MINUTES, window))
@staticmethod
def _parse_line_timestamp(line: str) -> Optional[datetime]:
"""从一行日志开头提取时间戳;提取不到返回 None。"""
match = _LOG_TIMESTAMP_RE.search(line[:64])
if not match:
return None
try:
return datetime.strptime(match.group(1), _LOG_TIMESTAMP_FORMAT)
except ValueError:
return None
@staticmethod
def _is_meta_noise(line: str) -> bool:
"""判断一行日志是否来自"Agent 自身 meta-noise"模块。
命中即排除。续行(无模块名)由调用方按"跟随父行"语义处理。
"""
match = _LOG_MODULE_RE.match(line)
if not match:
return False
return match.group(1).strip() in _META_NOISE_MODULES
@classmethod
def _filter_lines(
cls,
text: str,
keywords: list[str],
max_lines: int,
window_start: datetime,
) -> list[str]:
"""按时间窗 + 关键词筛日志。
- 行能解析到时间戳:在 ``window_start`` 之前的丢弃;之后的进入候选。
- 行解析不到时间戳Traceback 续行等):跟随**最近一条已知时间戳行**
的归属,没有上下文时按"近期"对待,避免把异常堆栈截断。
- 在候选行里再按关键词过滤;无关键词或全部行都不命中时退回到时间
窗内的尾部行,保证返回有意义的内容而不是空集。
"""
candidates: list[str] = []
last_seen_in_window: Optional[bool] = None
last_seen_was_meta: bool = False
for line in text.splitlines():
if not line.strip():
continue
ts = cls._parse_line_timestamp(line)
if ts is not None:
in_window = ts >= window_start
# Meta-noise 行agent/tool framework 自己的日志)即便落在窗口
# 内也直接丢;它们对用户面故障定位没有价值,反而会因为带有
# ``database / 推荐 / 豆包`` 之类关键字让诊断段灌满 noise。
is_meta = cls._is_meta_noise(line)
last_seen_was_meta = is_meta
last_seen_in_window = in_window and not is_meta
if in_window and not is_meta:
candidates.append(line)
else:
# 续行跟随上一条时间戳行的去留meta-noise 父行的续行也丢)
if last_seen_in_window and not last_seen_was_meta:
candidates.append(line)
if not candidates:
return []
if keywords:
lowered_keywords = [item.lower() for item in keywords]
# 关键字过滤需要按"时间戳行块"为单位:命中的 ERROR 行带着它的
# Traceback 续行一起保留,避免把异常堆栈截掉一半反而更难定位。
matched: list[str] = []
keep_block = False
for line in candidates:
has_ts = cls._parse_line_timestamp(line) is not None
if has_ts:
keep_block = any(kw in line.lower() for kw in lowered_keywords)
if keep_block:
matched.append(line)
elif keep_block:
matched.append(line)
if matched:
return matched[-max_lines:]
return candidates[-max_lines:]
async def run(
self,
original_user_request: str,
keywords: Optional[list[str]] = None,
max_lines: int = 80,
time_window_minutes: int = _DEFAULT_TIME_WINDOW_MINUTES,
**kwargs,
) -> str:
"""读取、筛选、脱敏并缓存本次反馈相关日志。
Issue #5806 暴露的两个数据准确性问题在这里一并修:
1. 时间窗:默认只看最近 30 分钟,杜绝历史无关日志混入。
2. 关键词过滤收紧:剔除"错误/异常/失败"等几乎每行都命中的通用词。
反馈入口意图门(用户反馈):``original_user_request`` 里必须有
明确"我要提 Issue / 反馈 issue / file an issue"之类的短语;
Agent 自作主张把"TMDB 报错"理解成"反馈" 时直接拒绝,引导回归
本地诊断路径,避免给上游刷 Issue。
"""
if not self._has_explicit_feedback_intent(original_user_request):
logger.info(
"collect_feedback_diagnostics 拒绝:原始请求里没有明确"
"反馈意图。原话=%r",
(original_user_request or "")[:120],
)
return json.dumps(
{
"success": False,
"reason": "no_explicit_feedback_intent",
"message": (
"用户原话里没有明确要求向上游反馈 Issue 的短语,"
"不应直接进入反馈流程。请回到常规诊断路径,使用"
"query_subscribes / query_download_tasks / "
"query_logs / test_site 等工具先排查;仅当用户"
"在排查后明确要求把问题转给上游(例如说出 "
"「反馈 issue / 提 issue / 报 bug / 让上游修一下」"
"之类的原话),才能再次调用本工具。"
),
},
ensure_ascii=False,
indent=2,
)
try:
normalized_max_lines = min(max(int(max_lines or 80), 20), 200)
except (TypeError, ValueError):
normalized_max_lines = 80
window_minutes = self._normalize_window(time_window_minutes)
window_start = datetime.now() - timedelta(minutes=window_minutes)
normalized_keywords = self._normalize_keywords(original_user_request, keywords)
collected: list[str] = []
source_files: list[str] = []
log_files = await self.run_blocking("default", self._candidate_log_files)
for path in log_files:
text = await self.run_blocking("default", self._read_tail, path)
if not text:
continue
lines = self._filter_lines(
text, normalized_keywords, normalized_max_lines, window_start
)
if not lines:
continue
source_files.append(str(path))
collected.append(f"### {path.name}\n" + "\n".join(lines))
raw_logs = "\n\n".join(collected)
logs = SubmitFeedbackIssueTool._sanitize_logs(raw_logs, _MAX_DIAGNOSTIC_LOG_CHARS)
found = bool(logs.strip())
record = feedback_issue_state_store.create_diagnostics(
session_id=self._session_id,
user_id=self._user_id,
username=self._username,
logs=logs,
source_files=source_files,
found=found,
)
self._agent_context["feedback_issue_diagnostics_id"] = record.diagnostics_id
# 关键:不要把 ``logs`` 内容回传给 LLM。日志可达 6KB回传后 LLM
# 还会在下一步把它原样塞进 prepare_feedback_issue 的入参里二次
# transit导致 26B/V3 等模型每轮要 ingest+emit 数 KB 文本,响应延
# 迟从秒级飙到分钟级(曾观察到 collect 返回 7.7KB → 下一轮 prepare
# 入参 logs 字段又重复一份)。日志全程只通过 ``diagnostics_id``
# 在服务端的 ``feedback_issue_state_store`` 流转,模型只看到摘要。
log_bytes = len(record.logs.encode("utf-8", errors="replace"))
log_lines = len(record.logs.splitlines()) if record.logs else 0
return json.dumps(
{
"success": True,
"diagnostics_id": record.diagnostics_id,
"found": record.found,
"source_files": record.source_files,
"log_bytes": log_bytes,
"log_lines": log_lines,
"message": (
"已收集并缓存反馈诊断日志。"
if found
else "已完成诊断日志收集,但未找到明显相关日志。"
) + (
"日志已通过 diagnostics_id 缓存在服务端,"
"后续 prepare_feedback_issue / submit_feedback_issue "
"只需传入 diagnostics_id**不要**再把日志正文当参数传回。"
),
},
ensure_ascii=False,
indent=2,
)

View File

@@ -0,0 +1,261 @@
"""反馈 Issue 流程的短期服务端状态。
这里保存两类只应由工具写入的状态:
- 诊断日志收集结果:证明 Agent 在提交前尝试读取过本地日志。
- 用户确认结果:证明用户通过按钮确认过某份预览草稿。
状态只保存在当前进程内,重启后失效;这符合反馈提交这种交互式流程的预期,
也避免把一次性确认 token 持久化到数据库。
"""
from __future__ import annotations
import hashlib
import time
import uuid
from dataclasses import dataclass
from threading import Lock
from typing import Optional
FEEDBACK_CONFIRM_VALUE_PREFIX = "__feedback_issue_confirm__:"
_STATE_TTL_SECONDS = 60 * 60
@dataclass
class FeedbackDiagnosticsRecord:
"""一次反馈诊断日志收集结果。"""
diagnostics_id: str
session_id: str
user_id: str
username: Optional[str]
logs: str
source_files: list[str]
found: bool
created_at: float
@dataclass
class FeedbackConfirmationRecord:
"""一次反馈 Issue 预览确认状态。"""
confirmation_token: str
session_id: str
user_id: str
username: Optional[str]
draft_hash: str
diagnostics_id: str
created_at: float
confirmed_at: Optional[float] = None
def build_feedback_draft_hash(
*,
title: str,
version: str,
environment: str,
issue_type: str,
description: str,
original_user_request: str,
logs: Optional[str],
diagnostics_id: str,
) -> str:
"""为用户确认的 Issue 草稿生成稳定摘要。"""
parts = (
title.strip(),
version.strip(),
environment.strip(),
issue_type.strip(),
description.strip(),
original_user_request.strip(),
(logs or "").strip(),
diagnostics_id.strip(),
)
return hashlib.sha256("\x00".join(parts).encode("utf-8", errors="replace")).hexdigest()
class FeedbackIssueStateStore:
"""管理反馈 Issue 流程的进程内短期状态。"""
def __init__(self) -> None:
self._diagnostics: dict[str, FeedbackDiagnosticsRecord] = {}
self._confirmations: dict[str, FeedbackConfirmationRecord] = {}
self._lock = Lock()
def _cleanup_locked(self) -> None:
expire_before = time.time() - _STATE_TTL_SECONDS
for diagnostics_id, record in list(self._diagnostics.items()):
if record.created_at < expire_before:
self._diagnostics.pop(diagnostics_id, None)
for token, record in list(self._confirmations.items()):
if record.created_at < expire_before:
self._confirmations.pop(token, None)
def create_diagnostics(
self,
*,
session_id: str,
user_id: str,
username: Optional[str],
logs: str,
source_files: list[str],
found: bool,
) -> FeedbackDiagnosticsRecord:
"""登记一次日志收集结果。"""
with self._lock:
self._cleanup_locked()
diagnostics_id = uuid.uuid4().hex[:12]
while diagnostics_id in self._diagnostics:
diagnostics_id = uuid.uuid4().hex[:12]
record = FeedbackDiagnosticsRecord(
diagnostics_id=diagnostics_id,
session_id=session_id,
user_id=str(user_id),
username=username,
logs=logs,
source_files=source_files,
found=found,
created_at=time.time(),
)
self._diagnostics[diagnostics_id] = record
return record
def get_diagnostics(
self,
diagnostics_id: str,
*,
session_id: str,
user_id: str,
) -> Optional[FeedbackDiagnosticsRecord]:
"""按会话和用户读取诊断结果,防止跨用户复用。"""
with self._lock:
self._cleanup_locked()
record = self._diagnostics.get(diagnostics_id)
if not record:
return None
if record.session_id != session_id or record.user_id != str(user_id):
return None
return record
def find_active_confirmation(
self,
*,
session_id: str,
user_id: str,
) -> Optional[FeedbackConfirmationRecord]:
"""查找当前会话/用户尚未消费、且未点击确认的预览 token。
prepare_feedback_issue 会用它判断「上一份预览还挂着,不该再发一份」,
避免 #5806 实测里发了两次同样的确认按钮、用户点了两次的情况。"""
with self._lock:
self._cleanup_locked()
for record in self._confirmations.values():
if (
record.session_id == session_id
and record.user_id == str(user_id)
and record.confirmed_at is None
):
return record
return None
def invalidate_active_confirmations(
self,
*,
session_id: str,
user_id: str,
) -> int:
"""作废当前会话所有未确认的预览 token返回作废数量。
用户在 prepare 之后修改草稿、重新调 prepare 时调用;旧 token 失效
后即便残留消息里的按钮被点击,``mark_confirmed`` 也会因找不到记录
而返回 False避免脏数据驱动提交。"""
with self._lock:
self._cleanup_locked()
to_drop = [
token
for token, record in self._confirmations.items()
if record.session_id == session_id
and record.user_id == str(user_id)
and record.confirmed_at is None
]
for token in to_drop:
self._confirmations.pop(token, None)
return len(to_drop)
def create_confirmation(
self,
*,
session_id: str,
user_id: str,
username: Optional[str],
draft_hash: str,
diagnostics_id: str,
) -> FeedbackConfirmationRecord:
"""创建待用户点击确认的草稿 token。"""
with self._lock:
self._cleanup_locked()
token = uuid.uuid4().hex
while token in self._confirmations:
token = uuid.uuid4().hex
record = FeedbackConfirmationRecord(
confirmation_token=token,
session_id=session_id,
user_id=str(user_id),
username=username,
draft_hash=draft_hash,
diagnostics_id=diagnostics_id,
created_at=time.time(),
)
self._confirmations[token] = record
return record
def mark_confirmed(
self,
token: str,
*,
session_id: str,
user_id: str,
) -> bool:
"""按钮回调命中时,把 token 标记为已由用户确认。"""
with self._lock:
self._cleanup_locked()
record = self._confirmations.get(token)
if not record:
return False
if record.session_id != session_id or record.user_id != str(user_id):
return False
record.confirmed_at = time.time()
return True
def consume_confirmed(
self,
token: str,
*,
session_id: str,
user_id: str,
draft_hash: str,
) -> Optional[FeedbackConfirmationRecord]:
"""消费一次已确认 token内容摘要不一致时拒绝。"""
with self._lock:
self._cleanup_locked()
record = self._confirmations.get(token)
if not record:
return None
if (
record.session_id != session_id
or record.user_id != str(user_id)
or record.draft_hash != draft_hash
or record.confirmed_at is None
):
return None
return self._confirmations.pop(token, None)
def clear(self) -> None:
"""测试和重置场景使用:清空所有短期状态。"""
with self._lock:
self._diagnostics.clear()
self._confirmations.clear()
feedback_issue_state_store = FeedbackIssueStateStore()

View File

@@ -0,0 +1,285 @@
"""生成反馈 Issue 预览并要求用户按钮确认。"""
from __future__ import annotations
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool, ToolChain
from app.agent.tools.impl.feedback_issue_state import (
FEEDBACK_CONFIRM_VALUE_PREFIX,
build_feedback_draft_hash,
feedback_issue_state_store,
)
from app.agent.tools.impl.submit_feedback_issue import (
ALLOWED_ENVIRONMENTS,
ALLOWED_ISSUE_TYPES,
MAX_TITLE_CHARS,
SubmitFeedbackIssueTool,
)
from app.helper.interaction import AgentInteractionOption, agent_interaction_manager
from app.log import logger
from app.schemas import Notification, NotificationType
from app.schemas.message import ChannelCapabilityManager
from app.schemas.types import MessageChannel
class PrepareFeedbackIssueInput(BaseModel):
"""反馈 Issue 预览确认工具输入。"""
explanation: str = Field(
...,
description="Clear explanation of why a feedback issue preview is being prepared",
)
title: str = Field(..., description="Issue title following `[错误报告]: <短描述>`")
version: str = Field(..., description="Current MoviePilot version")
environment: str = Field(..., description="Exactly Docker or Windows")
issue_type: str = Field(..., description="主程序运行问题 / 插件问题 / 其他问题")
description: str = Field(..., description="Structured issue description")
original_user_request: str = Field(..., description="Verbatim original user request")
diagnostics_id: str = Field(
...,
description=(
"diagnostics_id returned by collect_feedback_diagnostics. Logs are loaded from "
"the server-side state store via this id — do NOT pass the log text itself."
),
)
class PrepareFeedbackIssueTool(MoviePilotTool):
"""发送 Issue 草稿预览,并创建只能由按钮回调确认的 token。"""
name: str = "prepare_feedback_issue"
sends_message: bool = True
description: str = (
"Prepare a feedback issue preview and ask the user to confirm via buttons. "
"Must be called after collect_feedback_diagnostics and before submit_feedback_issue. "
"Returns a confirmation_token, but submit_feedback_issue will only accept it after "
"the user actually clicks the confirmation button."
)
args_schema: Type[BaseModel] = PrepareFeedbackIssueInput
require_admin: bool = True
def get_tool_message(self, **kwargs) -> Optional[str]:
"""侧边消息:告知用户正在生成提交预览。"""
return "生成问题反馈预览并等待确认"
@staticmethod
def _truncate_button_text(text: str, max_length: int) -> str:
"""按渠道限制裁剪按钮文案。"""
if max_length <= 0 or len(text) <= max_length:
return text
if max_length <= 3:
return text[:max_length]
return text[: max_length - 3] + "..."
@staticmethod
def _result_payload(**fields) -> str:
"""统一 JSON 返回,便于 Agent 按字段继续下一步。"""
return json.dumps(fields, ensure_ascii=False, indent=2)
async def run(
self,
title: str,
version: str,
environment: str,
issue_type: str,
description: str,
original_user_request: str,
diagnostics_id: str,
**kwargs,
) -> str:
"""校验草稿、发送预览按钮,并缓存待确认 token。"""
if not self._channel or not self._source:
return self._result_payload(
success=False,
reason="no_channel",
message="当前不在可回传消息的会话中,无法发送 Issue 预览确认按钮。",
)
try:
channel = MessageChannel(self._channel)
except ValueError:
return self._result_payload(
success=False,
reason="unsupported_channel",
message=f"不支持的消息渠道: {self._channel}",
)
if not (
ChannelCapabilityManager.supports_buttons(channel)
and ChannelCapabilityManager.supports_callbacks(channel)
):
return self._result_payload(
success=False,
reason="buttons_unsupported",
message=f"当前渠道 {channel.value} 不支持按钮确认,不能自动提交反馈 Issue。",
)
diagnostics = feedback_issue_state_store.get_diagnostics(
diagnostics_id,
session_id=self._session_id,
user_id=self._user_id,
)
if not diagnostics:
return self._result_payload(
success=False,
reason="diagnostics_missing",
message="缺少有效的诊断日志收集记录,请先调用 collect_feedback_diagnostics。",
)
# 日志全程只从服务端 state store 流转,避免日志在 LLM 上下文里反复
# 进出造成响应延迟(见 collect_feedback_diagnostics 中的设计注释)。
logs = diagnostics.logs
for value, allowed, field_name in (
(environment, ALLOWED_ENVIRONMENTS, "environment"),
(issue_type, ALLOWED_ISSUE_TYPES, "issue_type"),
):
err = SubmitFeedbackIssueTool._validate_enum(value, allowed, field_name)
if err:
return self._result_payload(success=False, reason="invalid_input", message=err)
title = SubmitFeedbackIssueTool._truncate(title, MAX_TITLE_CHARS, marker="")
quality_err = SubmitFeedbackIssueTool._check_content_quality(
title=title,
description=description,
original_user_request=original_user_request,
)
if quality_err:
self._agent_context["feedback_issue_rejected_quality"] = True
self._agent_context["feedback_issue_rejected_quality_reason"] = quality_err
return self._result_payload(
success=False,
reason="rejected_quality",
message=quality_err,
)
draft_hash = build_feedback_draft_hash(
title=title,
version=version,
environment=environment,
issue_type=issue_type,
description=description,
original_user_request=original_user_request,
logs=logs,
diagnostics_id=diagnostics_id,
)
# 同会话/用户已经发过预览且尚未被用户点击确认:拒绝重复发预览。
# Why: Issue #5806 实测中 agent 在一次用户输入里连续调用了两次
# prepare_feedback_issue导致 TG 里出现两份「确认提交」按钮,用户
# 点击两次后才进入提交。这里直接挡住重复预览:草稿一致就复用旧
# token草稿变了则要求 Agent 自己撤销旧 token 再发新预览(以免
# 残留按钮指向过期内容)。
active = feedback_issue_state_store.find_active_confirmation(
session_id=self._session_id,
user_id=self._user_id,
)
if active is not None:
if active.draft_hash == draft_hash:
logger.info(
"feedback issue preview deduped: session_id=%s reuse token=%s",
self._session_id,
active.confirmation_token[:8],
)
self._agent_context["user_reply_sent"] = True
self._agent_context["reply_mode"] = "feedback_issue_confirmation"
return self._result_payload(
success=True,
deduped=True,
confirmation_token=active.confirmation_token,
diagnostics_id=diagnostics_id,
message=(
"上一份相同内容的反馈预览仍在等待用户点击确认,"
"未重复发送按钮。请勿再次调用 prepare_feedback_issue。"
),
)
logger.info(
"feedback issue preview superseded: session_id=%s drop_token=%s",
self._session_id,
active.confirmation_token[:8],
)
feedback_issue_state_store.invalidate_active_confirmations(
session_id=self._session_id,
user_id=self._user_id,
)
confirmation = feedback_issue_state_store.create_confirmation(
session_id=self._session_id,
user_id=self._user_id,
username=self._username,
draft_hash=draft_hash,
diagnostics_id=diagnostics_id,
)
option_value = f"{FEEDBACK_CONFIRM_VALUE_PREFIX}{confirmation.confirmation_token}"
request = agent_interaction_manager.create_request(
session_id=self._session_id,
user_id=str(self._user_id),
channel=channel.value,
source=self._source,
username=self._username,
title="确认提交问题反馈",
prompt="请确认是否将以下问题反馈提交到 MoviePilot 上游仓库。",
options=[
AgentInteractionOption(label="确认提交", value=option_value),
AgentInteractionOption(label="取消提交", value="取消提交问题反馈"),
],
)
max_text_length = ChannelCapabilityManager.get_max_button_text_length(channel)
buttons = [
[
{
"text": self._truncate_button_text("确认提交", max_text_length),
"callback_data": f"agent_interaction:choice:{request.request_id}:1",
}
],
[
{
"text": self._truncate_button_text("取消提交", max_text_length),
"callback_data": f"agent_interaction:choice:{request.request_id}:2",
}
],
]
preview = (
"请确认是否提交以下问题反馈:\n\n"
f"标题:{title}\n"
f"版本:{version}\n"
f"环境:{environment}\n"
f"类型:{issue_type}\n"
f"诊断日志:{'已找到相关日志' if diagnostics.found else '未找到明确相关日志'}\n\n"
f"{description.strip()[:1800]}"
)
await ToolChain().async_post_message(
Notification(
channel=channel,
source=self._source,
mtype=NotificationType.Agent,
userid=self._user_id,
username=self._username,
title="确认提交问题反馈",
text=preview,
buttons=buttons,
)
)
logger.info(
"feedback issue preview sent: session_id=%s diagnostics_id=%s token=%s",
self._session_id,
diagnostics_id,
confirmation.confirmation_token[:8],
)
self._agent_context["user_reply_sent"] = True
self._agent_context["reply_mode"] = "feedback_issue_confirmation"
return self._result_payload(
success=True,
confirmation_token=confirmation.confirmation_token,
diagnostics_id=diagnostics_id,
message=(
"已通过独立通知卡片发送 Issue 预览和「确认提交 / 取消提交」"
"按钮给用户。**本轮对话不要再生成任何额外文字回复**——按钮"
"卡片已经完整表达了 Issue 草稿和操作引导,复述「已生成 "
"Issue 预览,请点击确认按钮」会和卡片重复并让用户困惑。"
"请直接结束本轮,等待用户点击按钮触发下一轮。"
),
)

View File

@@ -16,6 +16,7 @@
from __future__ import annotations
import asyncio
import hashlib
import json
import re
@@ -25,8 +26,14 @@ from urllib.parse import quote
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.agent.tools.base import MoviePilotTool, ToolChain
from app.schemas import Notification
from app.agent.tools.impl.feedback_issue_state import (
build_feedback_draft_hash,
feedback_issue_state_store,
)
from app.core.config import settings
from app.db.user_oper import UserOper
from app.log import logger
from app.utils.http import AsyncRequestUtils
@@ -61,24 +68,143 @@ MAX_URL_LOGS_CHARS = 3 * 1024
# 防止 agent 重复触发提交60 秒内同 title+body 哈希命中视为重复。
DEDUP_TTL_SECONDS = 60
# 日志二次脱敏正则:作为 defense-in-depth避免 agent 漏脱敏时把凭据直接
# 写进公网 issue。SKILL.md 要求 agent 主动脱敏,这里只兜最常见的高危模式。
# Per-user rate limit
# - 任意两次提交之间至少 30 分钟冷却(哪怕 title/body 不同),杜绝快速刷屏
# - 24 小时滚动窗口内每用户最多 10 个 Issue杜绝长期大量灌水
# 两者叠加:``require_admin`` 限制了谁能提rate limit 限制了能提多少。
USER_COOLDOWN_SECONDS = 30 * 60
USER_DAILY_QUOTA = 10
USER_DAILY_WINDOW_SECONDS = 24 * 60 * 60
# 防止 _user_submissions 字典在 username 拼写漂移("admin" / "Admin" /
# "admin ")或恶意输入下无限增长。超过此上限时按 LRU 淘汰最久未活跃的桶。
MAX_USER_SUBMISSIONS_BUCKETS = 200
# 内容质量门槛:阻止「测试 issue」「abc」等明显无意义提交。AI 在 SKILL.md
# 中已经被要求"先筛",这里是 defense-in-depth 工具层硬门槛。
MIN_TITLE_BODY_CHARS = 8 # ``[错误报告]: `` 前缀外,标题至少 8 字
MIN_DESCRIPTION_CHARS = 50 # description 整体至少 50 字
TITLE_PREFIX = "[错误报告]:"
# 黑词单title 或 description 命中即拒。匹配为字面包含(大小写不敏感)。
# 不用正则避免误伤合法 bug 描述。条目专注于"明显的占位 / 测试 / 乱码"。
# 注:仅做字面字符串匹配;专业对抗者可以用全角 / 同形 unicode 绕过——
# 当前威胁模型是「失控 LLM / 无意 spam」而非「对抗攻击」可接受。
_QUALITY_BLOCKLIST = (
"测试issue", "测试 issue", "test issue",
"test123", "testtest", "测试测试",
"测试一下", "测试提交", "测试请求", "测试反馈",
"看能否跑通", "能否跑通", "跑通流程", "链路测试",
"模拟问题", "模拟问题描述", "模拟描述", "模拟 bug", "模拟bug",
"编造", "虚假 bug", "虚假bug",
"asdf", "asdfasdf", "qwer", "qwerty", "qweqwe",
"占位", "占个坑", "随便", "随便写",
"abcabc", "xxxxxx", "xxx xxx",
"hello world", "你好世界",
"lorem ipsum", "dolor sit amet",
)
# logs 字段只能承载真实日志;这类短语说明 Agent 把叙述性占位内容塞进了日志。
_FABRICATED_LOG_PHRASES = (
"无相关日志", "没有相关日志", "未捕获到相关日志",
"这是模拟", "模拟问题", "模拟描述", "用户反馈",
)
# 结构化描述信号:工具层不做复杂语义理解,但至少要求 Agent 提交的正文
# 已经区分现象、复现和期望,避免把"用户反馈某模块异常,请协助排查"这类
# 无法复现的泛泛描述伪装成正式 Issue。
_DESCRIPTION_REQUIRED_SIGNALS = (
("现象", ("现象", "报错", "错误", "无法", "失败", "异常")),
("复现步骤", ("复现", "步骤", "触发", "操作", "调用", "点击")),
("期望行为", ("期望", "应该", "预期", "正常")),
)
# 检测乱码 / 重复字符行:连续 8 个或以上**相同**字符视为乱码。
# **排除**常见 Markdown / 日志分隔符:空白、`=`、`-`、`_`、`*`、`#`、
# `~`、`` ` ``、`.`、`/`、`\`、`+`、`|`。这些字符大量重复在合法日志(如
# `========`、`---- separator ----`)或 Markdown 横线(`---`)里常见,
# 不应该被判为乱码。
_REPEAT_GIBBERISH = re.compile(r"([^\s=\-_*#~`./\\+|])\1{7,}", re.UNICODE)
# 日志脱敏:服务端唯一的脱敏入口(``_sanitize_logs``。Agent 不再做客户端
# 脱敏,日志也不进入 LLM 上下文,所以这里是日志写入公网 Issue 之前的最后
# 一道防线,必须尽量覆盖 MoviePilot 本身和常见社区插件可能打印的高危凭据
# 与 PII 模式。规则按"先匹配更具体的形式、再匹配通用 key=value"的顺序排列,
# 避免通用规则吞掉特定上下文。
#
# 当前威胁模型仍是「失控 LLM / 无意 spam / 日志意外漏出」,不是「对抗攻击」;
# 全角变体 / 同形 unicode 绕过不在防护范围内。
_REDACTED = "<REDACTED>"
_REDACTED_PATH = "/<USER>/"
_REDACTED_EMAIL = "<EMAIL>"
_REDACTED_IP = "<IP>"
_SENSITIVE_PATTERNS: tuple[tuple[re.Pattern, str], ...] = (
(re.compile(r"(?i)(Cookie\s*:\s*)[^\r\n]+"), r"\1<REDACTED>"),
(re.compile(r"(?i)(Set-Cookie\s*:\s*)[^\r\n]+"), r"\1<REDACTED>"),
# ---- HTTP 头部凭据 ----------------------------------------------------
(re.compile(r"(?i)(Cookie\s*:\s*)[^\r\n]+"), rf"\1{_REDACTED}"),
(re.compile(r"(?i)(Set-Cookie\s*:\s*)[^\r\n]+"), rf"\1{_REDACTED}"),
(
re.compile(r"(?i)(Authorization\s*:\s*)(Bearer|Basic|Token)\s+\S+"),
r"\1\2 <REDACTED>",
rf"\1\2 {_REDACTED}",
),
(re.compile(r"(?i)(X-(?:Api-Key|Auth-Token|Access-Token)\s*:\s*)\S+"), rf"\1{_REDACTED}"),
# ---- GitHub / 通用 token 字面前缀(即使没有 key= 上下文也覆盖)---------
(re.compile(r"\bghp_[A-Za-z0-9]{20,}\b"), _REDACTED),
(re.compile(r"\bgho_[A-Za-z0-9]{20,}\b"), _REDACTED),
(re.compile(r"\bgithub_pat_[A-Za-z0-9_]{20,}\b"), _REDACTED),
(re.compile(r"\b(sk|xoxb|xoxp|xoxa)-[A-Za-z0-9-]{12,}\b"), _REDACTED),
# ---- MoviePilot 会话 ID``user_<userid>_<timestamp>``):嵌入了 userid
# 即便上下文里没出现 ``session_id=`` 前缀也得脱敏,否则 agent 模块虽被
# meta-noise 过滤掉,其它非 noise 模块也可能在 traceback 里 echo 出这个
# 字面值(见 #5808 教训)。
(re.compile(r"\buser_\d{4,}_\d+\b"), _REDACTED),
# ---- 站点 PT passkey / RSS / IM webhook --------------------------------
(re.compile(r"(?i)\b(passkey|rsskey|authkey|access_key)=[A-Za-z0-9]{8,}"), rf"\1={_REDACTED}"),
(
# 捕获原始分隔符(``:`` 或 ``=``)并在替换中保留,避免把 ``key: val``
# 强制改成 ``key=<REDACTED>`` 破坏日志阅读体验
re.compile(
r"(?i)\b(api[_-]?key|apikey|access[_-]?token|refresh[_-]?token|"
r"passkey|password|secret|token)(\s*[:=]\s*)['\"]?[^\s'\"&\r\n]+"
r"https?://(qyapi\.weixin\.qq\.com|oapi\.dingtalk\.com|open\.feishu\.cn|"
r"hooks\.slack\.com|discord(?:app)?\.com/api/webhooks)/\S+"
),
r"\1\2<REDACTED>",
rf"\1/{_REDACTED}",
),
# ---- 通用 key=value / key: value 凭据 + 用户身份 PII保留原始分隔符---
# 用户标识字段在 #5808 实战里被发现混进 logsTelegram numeric userid /
# GitHub-style username。即便 meta-noise 过滤会丢掉大多数 agent
# framework 日志,仍可能有非 noise 模块(如 plugin / hook打印这些
# 字段,所以此处把"用户身份"也纳入脱敏。
(
re.compile(
r"(?i)\b("
r"api[_-]?key|apikey|access[_-]?token|refresh[_-]?token|id[_-]?token|"
r"client[_-]?secret|client[_-]?id|app[_-]?secret|app[_-]?key|"
r"corp[_-]?secret|corp[_-]?id|agent[_-]?id|"
r"password|secret|token|auth|credential|"
r"chat[_-]?id|webhook|api[_-]?token|bot[_-]?token|"
r"user[_-]?id|userid|username|user[_-]?name|"
r"session[_-]?id|sessionid|"
r"open[_-]?id|openid|union[_-]?id|unionid"
r")(\s*[:=]\s*)['\"]?[^\s'\"&\r\n]{2,}"
),
rf"\1\2{_REDACTED}",
),
# ---- PII邮箱 ----------------------------------------------------------
(
re.compile(r"[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}"),
_REDACTED_EMAIL,
),
# ---- PII公网 IPv4保留 127/8、10/8、172.16/12、192.168/16 私网)------
(
re.compile(
r"\b(?!(?:127|10)\.)"
r"(?!172\.(?:1[6-9]|2\d|3[01])\.)"
r"(?!192\.168\.)"
r"(?:\d{1,3}\.){3}\d{1,3}\b"
),
_REDACTED_IP,
),
# ---- 文件路径里的用户名段 ---------------------------------------------
(re.compile(r"/Users/[^/\s]+/"), _REDACTED_PATH),
(re.compile(r"/home/[^/\s]+/"), _REDACTED_PATH),
(re.compile(r"C:\\Users\\[^\\\s]+\\", re.IGNORECASE), r"C:\\Users\\<USER>\\"),
)
@@ -124,14 +250,33 @@ class SubmitFeedbackIssueInput(BaseModel):
...,
description=(
"Markdown-formatted bug description, including 现象 / 复现步骤 / "
"期望行为 / 已定位或推测 / 已尝试的处理 等结构化小节。"
"期望行为 / 已定位或推测 / 已尝试的处理 等结构化小节。Must be "
"based on a real user-observed symptom; do not fabricate or "
"rewrite placeholder/test requests into real-looking bugs."
),
)
logs: Optional[str] = Field(
default=None,
original_user_request: str = Field(
...,
description=(
"Raw backend logs related to the bug. Leave empty if not captured; "
"do NOT fabricate."
"Verbatim original user request that triggered issue filing. "
"Must not be summarized or rewritten. The tool uses this field "
"to reject test/pipeline-validation intent such as 测试 ISSUE or 看能否跑通."
),
)
diagnostics_id: str = Field(
...,
description=(
"diagnostics_id returned by collect_feedback_diagnostics. Required; logs are "
"fetched from the server-side state store using this id. Do NOT pass log text "
"as a separate argument — it has been removed from the schema on purpose to "
"stop the LLM from re-transmitting multi-KB log payloads between tool calls."
),
)
confirmation_token: str = Field(
...,
description=(
"confirmation_token returned by prepare_feedback_issue after the user clicks the "
"confirmation button. Do not invent this value."
),
)
@@ -141,6 +286,17 @@ class SubmitFeedbackIssueTool(MoviePilotTool):
require_admin=True避免任意 TG/飞书用户通过 Bot 触发后给上游刷 Issue。
Skill 层会在 dry-run 阶段做用户确认,本工具再做枚举校验与凭据降级。
**状态持久化与并发说明**
- ``_recent_submissions`` 与 ``_user_submissions`` 都是 ``ClassVar``
进程级缓存,**MoviePilot 重启后清零**。一个失控管理员只要重启容器
就可绕过冷却 / 配额。如果将来需要更强保护,可改为持久化到
``SystemConfigOper`` 或 DB 表里。当前威胁模型是「失误 / 失控 LLM」
而非「专业对抗」,可接受。
- 这两份缓存的读写依赖 Agent 在同一事件循环里串行执行单个工具
调用——asyncio 单线程协程模型下安全。**严禁**在多线程 /
multiprocessing 场景下直接复用本工具实例;如有此需求,需加
``asyncio.Lock`` 守护写入。
"""
name: str = "submit_feedback_issue"
@@ -164,6 +320,12 @@ class SubmitFeedbackIssueTool(MoviePilotTool):
# 兜底,避免上游 issue 列表被重复条目污染。
_recent_submissions: ClassVar[dict[str, float]] = {}
# Per-user rate-limit 状态:{username: [timestamp, ...]}。
# 列表按时间顺序追加,每次检查时同步过滤掉 24h 之前的条目。仅在 admin
# 范围内有效require_admin 已限定调用者必须是 superuser所以条目
# 数量上限可控(即便所有用户都在刷,单条记录也只多到 quota+1 就被拒)。
_user_submissions: ClassVar[dict[str, list]] = {}
def get_tool_message(self, **kwargs) -> Optional[str]:
"""侧边消息:让用户知道 Agent 正在帮他向上游提交反馈。"""
title = kwargs.get("title") or ""
@@ -344,6 +506,221 @@ class SubmitFeedbackIssueTool(MoviePilotTool):
).hexdigest()
cls._recent_submissions[key] = time.time()
@staticmethod
def _normalize_username(username: str) -> str:
"""归一化 username 作为 rate-limit 桶 key。
防止 ``"admin"`` / ``"Admin"`` / ``" admin "`` 这种拼写漂移把同一个
管理员散到多个桶里、绕过冷却。统一小写 + 去前后空白。空串原样返回,
由调用方判定。"""
return (username or "").strip().lower()
@classmethod
def _evict_user_submissions_if_needed(cls) -> None:
"""``_user_submissions`` 字典 key 数量上限保护。
按桶内"最近一次提交时间戳"做 LRU超过 ``MAX_USER_SUBMISSIONS_BUCKETS``
时淘汰最久未活跃的桶,避免恶意 / 漂移输入把字典撑爆。"""
if len(cls._user_submissions) <= MAX_USER_SUBMISSIONS_BUCKETS:
return
# 按桶内最新时间戳升序排序,前 N 个最旧的淘汰
excess = len(cls._user_submissions) - MAX_USER_SUBMISSIONS_BUCKETS
oldest_keys = sorted(
cls._user_submissions.items(),
key=lambda kv: kv[1][-1] if kv[1] else 0,
)[:excess]
for key, _ in oldest_keys:
cls._user_submissions.pop(key, None)
@classmethod
def _check_user_rate_limit(cls, username: str) -> Optional[str]:
"""检查 per-user rate limit30 分钟冷却 + 24h 滚动配额 10 条。
命中冷却时间窗或日配额时返回拒绝消息(含本地化时长描述),未命中则
返回 None。本方法不修改状态仅读记录由 ``_record_user_submission``
在真正发起 API 调用前完成。"""
key = cls._normalize_username(username)
if not key:
# 没有用户名识别走不下去,但 _enforce_superuser 早已拦截过;
# 双重保险下若到此处仍无用户名直接拒绝
return "无法识别调用用户身份rate limit 拒绝以防误用。"
now = time.time()
timestamps = cls._user_submissions.get(key, [])
# 同步清理过期条目(> 24h保持列表短小
active = [ts for ts in timestamps if now - ts < USER_DAILY_WINDOW_SECONDS]
if active != timestamps:
if active:
cls._user_submissions[key] = active
else:
# 全部过期,直接把桶清掉,避免 _user_submissions 长期堆积
# 长尾用户的空 list
cls._user_submissions.pop(key, None)
# 30 分钟冷却
if active:
since_last = now - active[-1]
if since_last < USER_COOLDOWN_SECONDS:
remaining = int(USER_COOLDOWN_SECONDS - since_last)
minutes, seconds = divmod(remaining, 60)
return (
f"为避免给上游刷屏,同一管理员两次提交之间至少间隔 "
f"{USER_COOLDOWN_SECONDS // 60} 分钟。请等 "
f"{minutes}{seconds} 秒后再试。"
)
# 24h 配额
if len(active) >= USER_DAILY_QUOTA:
oldest = active[0]
recover_in = int(USER_DAILY_WINDOW_SECONDS - (now - oldest))
hours, remainder = divmod(recover_in, 3600)
minutes = remainder // 60
return (
f"你今日已提交 {USER_DAILY_QUOTA} 个 Issue已达 24 小时配额上限。"
f"最早一条将在 {hours} 小时 {minutes} 分钟后过期,请到时再提。"
)
return None
@classmethod
def _record_user_submission(cls, username: str) -> None:
"""把本次提交时间戳记入 per-user 状态,供下次 rate limit 检查使用。"""
key = cls._normalize_username(username)
if not key:
return
cls._user_submissions.setdefault(key, []).append(time.time())
cls._evict_user_submissions_if_needed()
@classmethod
def _check_content_quality(
cls,
title: str,
description: str,
original_user_request: str,
) -> Optional[str]:
"""内容质量门槛:长度 + 黑词单 + 乱码三重过滤。
命中任一规则即拒绝,附带具体原因。该检查在 _enforce_superuser /
rate_limit 之后、`_build_issue_body` 之前调用,避免无意义 issue 浪费
上游 maintainer 的 triage 时间。
注:``logs`` 字段已从 Agent 入参里移除,日志改为通过 ``diagnostics_id``
在 state store 里流转Agent 无法伪造其内容,因此这里不再对 logs
做黑词单 / 伪造检查;脱敏仍由 ``_sanitize_logs`` 在服务端兜底。"""
original_stripped = (original_user_request or "").strip()
if not original_stripped:
return (
"缺少原始用户请求,无法判断本次提交是否来自真实故障。"
"请传入触发反馈的用户原话,不能只传改写后的 Issue 草稿。"
)
# 1) title 长度(剔除 ``[错误报告]: `` 前缀后)
title_body = title.strip()
if title_body.startswith(TITLE_PREFIX):
title_body = title_body[len(TITLE_PREFIX):].strip()
if len(title_body) < MIN_TITLE_BODY_CHARS:
return (
f"标题正文太短(剔除 {TITLE_PREFIX!r} 前缀后只有 {len(title_body)} 字,"
f"至少 {MIN_TITLE_BODY_CHARS} 字)。请用一句完整的话概括症状,"
"例如「订阅刷新时 TMDB 识别返回 500」。"
)
# 2) description 长度
desc_stripped = description.strip()
if len(desc_stripped) < MIN_DESCRIPTION_CHARS:
return (
f"问题描述太短({len(desc_stripped)} 字,至少 {MIN_DESCRIPTION_CHARS} 字)。"
"请补充:现象 / 复现步骤 / 期望行为,让 maintainer 能理解问题。"
)
# 3) 结构信号。SKILL.md 要求 Agent 在正文里分清现象、复现、期望;
# 工具层用关键词做保守兜底,拦住"为了跑通流程编的泛泛一句话"。
missing_signals = [
label
for label, choices in _DESCRIPTION_REQUIRED_SIGNALS
if not any(choice in desc_stripped for choice in choices)
]
if missing_signals:
return (
"问题描述缺少可复现 bug 所需的结构信息:"
f"{' / '.join(missing_signals)}。请补充真实现象、触发步骤和期望行为,"
"不要用模拟或泛泛描述跑通提交流程。"
)
# 4) 黑词单。同时检查原始用户请求 + 标题 + 描述,防止 Agent 把
# "测试 ISSUE / 看能否跑通" 改写成真实样式 title/description 后绕过。
haystack = "\n".join(
part for part in (title, description, original_stripped) if part
).lower()
for phrase in _QUALITY_BLOCKLIST:
if phrase.lower() in haystack:
return (
f"原始请求、标题或描述命中明显占位/测试关键词「{phrase}」,"
"已拒绝提交。"
"如果是真实问题,请用正常的中文描述具体现象。"
)
# 5) 乱码:连续 8 个相同字符
match = (
_REPEAT_GIBBERISH.search(title)
or _REPEAT_GIBBERISH.search(description)
or _REPEAT_GIBBERISH.search(original_stripped)
)
if match:
return (
f"标题或描述里出现疑似乱码片段「{match.group(0)[:12]}…」,"
"请用正常文字描述问题。"
)
return None
async def _enforce_superuser(self) -> Optional[str]:
"""强校验当前调用者必须是系统 superuser。
Why: 框架的 ``MoviePilotTool._check_permission`` 仅在 9 个内置渠道
映射 + 渠道配置齐全时才真正生效Web 渠道、未识别渠道、缺配置等情
况下会静默放行(见 ``app/agent/tools/base.py`` 的多条 ``return None``
分支)。``submit_feedback_issue`` 触发的是不可逆的上游写操作,**这
里必须独立做一道硬校验**,不能依赖框架那套渠道映射,否则任意能登
录 MoviePilot 的用户都能向上游刷 issue。
返回 None 表示放行;返回字符串则为拒绝原因(直接作为 LLM 可见的
message"""
username = self._username or ""
if not username:
return (
"submit_feedback_issue 拒绝:当前会话没有绑定 MoviePilot 用户身份,"
"无法确认调用者是否为系统管理员。"
)
# 两次尝试DB 偶发抖动场景下短暂退避 100ms 后再试一次,避免单次失败
# 直接卡死管理员。仍保持 fail-close第二次还失败就拒绝。
user = None
last_err: Optional[Exception] = None
for attempt in range(2):
try:
user = await UserOper().async_get_by_name(username)
last_err = None
break
except Exception as e: # noqa: BLE001 — DB 查询异常不能放行
last_err = e
logger.warning(
f"submit_feedback_issue 校验 superuser 时数据库异常 "
f"(attempt {attempt + 1}/2): {e}"
)
if attempt == 0:
await asyncio.sleep(0.1)
if last_err is not None:
logger.error(
f"submit_feedback_issue 校验 superuser 重试后仍失败: {last_err}"
)
return (
"submit_feedback_issue 拒绝:校验用户身份时发生数据库异常,"
"出于安全考虑本次提交被中止。请稍后重试或联系管理员。"
)
if not user:
return (
f"submit_feedback_issue 拒绝:未在 MoviePilot 中找到用户 "
f"{username!r},无法确认是否为系统管理员。"
)
if not user.is_superuser:
return (
"submit_feedback_issue 拒绝只有系统管理员superuser才能"
"向上游 MoviePilot 仓库提交问题反馈,避免任意用户通过对话"
"代理给上游刷 Issue。请联系管理员代为提交或自行登录管理员"
"账号后再试。"
)
return None
@staticmethod
def _safe_response_dict(response) -> dict:
"""安全解析 HTTP 响应体为 dict。
@@ -377,10 +754,35 @@ class SubmitFeedbackIssueTool(MoviePilotTool):
Why: TG/飞书等渠道下 LLM 转述 1KB+ 长 URL 极易出现字节翻转(低精度量化
模型尤其常见),导致 GitHub 拒绝预填链接。直接走 ToolChain 推送可以
让 URL 经由消息系统原文落地,跳过 LLM 转述链路。
Issue #5806 暴露的副作用:``send_tool_message`` 默认不抑制 TG 网页
预览,导致一条 GitHub URL 通知会自动渲染出 "GitHub" 预览卡片;之后
Agent 又用文本复述了一次 URLTG 再渲染一次 → 一次提交在 TG 里展开
成 3 条卡片。这里直接走 ``ToolChain().async_post_message`` 并显式
``disable_web_page_preview=True`` 关闭预览卡片,配合 SKILL.md 里
"Acknowledge briefly, do NOT repeat the URL" 让最终用户只看到一条
干净的链接消息。
"""
if not self._channel or not self._source:
# 没有可回传消息的会话上下文(典型:后台 capture直接当推送失败处理
logger.debug(
"feedback issue 链接推送跳过:当前无可用消息渠道 / 来源"
)
return False
text = f"{hint}\n\n{url}" if hint else url
try:
text = f"{hint}\n\n{url}" if hint else url
await self.send_tool_message(text, title=title)
await ToolChain().async_post_message(
Notification(
channel=self._channel,
source=self._source,
userid=self._user_id,
username=self._username,
title=title,
text=text,
disable_web_page_preview=True,
)
)
return True
except Exception as e: # noqa: BLE001 — 推送失败不应该让整个工具崩溃
logger.warning(
@@ -399,14 +801,35 @@ class SubmitFeedbackIssueTool(MoviePilotTool):
environment: str,
issue_type: str,
description: str,
logs: Optional[str] = None,
original_user_request: str,
diagnostics_id: str = "",
confirmation_token: str = "",
**kwargs,
) -> str:
"""执行反馈 Issue 提交流程。
所有入参都应来自已确认的真实问题草稿;工具层会再次校验质量、结构、
管理员身份和提交频率,避免 Agent 绕过 skill 预筛后把测试内容提交到
上游。"""
logger.info(
f"执行工具: {self.name}, 标题: {title!r}, 版本: {version!r}, "
f"环境: {environment!r}, 类型: {issue_type!r}"
)
# 0) 硬校验调用者必须是系统 superuser。框架的 _check_permission 在
# Web / 未识别渠道下会静默放行;本工具触发不可逆的上游写动作,
# 必须独立确认调用者身份,不能依赖渠道映射。
deny = await self._enforce_superuser()
if deny:
logger.warning(
f"submit_feedback_issue 拒绝非管理员调用username={self._username!r}"
)
return self._result_payload(
success=False,
reason="forbidden",
message=deny,
)
# 1) 入参枚举校验:失败直接拒绝,不消耗 GitHub 调用次数
for value, allowed, field_name in (
(environment, ALLOWED_ENVIRONMENTS, "environment"),
@@ -423,7 +846,116 @@ class SubmitFeedbackIssueTool(MoviePilotTool):
# 2) 兜底硬约束title 长度截断,避免超出 GitHub 256 字符限制
title = self._truncate(title, MAX_TITLE_CHARS, marker="")
# 3) 同会话内 60 秒去重,防止 agent 多次触发提交同一问题
# 3) 内容质量门槛:长度 + 黑词单 + 乱码。命中表示「明显的无意义提交」,
# 直接拒绝**不给** prefill_url——纵容也是放任这类内容不应该被
# 打开手动提交的旁路。
quality_err = self._check_content_quality(
title=title,
description=description,
original_user_request=original_user_request,
)
if quality_err:
logger.info(
f"拒绝低质量提交username={self._username!r} reason={quality_err[:40]}"
)
# 质量门槛已经明确拒绝后,同一轮对话不应再通过 ask_user_choice
# 引导用户把测试 / 占位内容改写成“真实问题”。这里写入共享
# tool context给后续消息型工具一个硬拦截信号避免模型不遵守
# SKILL.md 时继续发按钮。
self._agent_context["feedback_issue_rejected_quality"] = True
self._agent_context["feedback_issue_rejected_quality_reason"] = quality_err
return self._result_payload(
success=False,
reason="rejected_quality",
message=quality_err,
)
# 4) 反馈提交前必须先由专用工具收集诊断日志。即便日志里没有命中
# 相关片段,也要携带 collect_feedback_diagnostics 返回的
# diagnostics_id证明 Agent 没有跳过日志排查。
diagnostics = feedback_issue_state_store.get_diagnostics(
diagnostics_id,
session_id=self._session_id,
user_id=self._user_id,
)
if not diagnostics:
return self._result_payload(
success=False,
reason="diagnostics_required",
message=(
"提交前必须先调用 collect_feedback_diagnostics 收集本地日志。"
"如果没有找到相关日志,也需要携带该工具返回的 diagnostics_id。"
),
)
# 日志固定从服务端 state store 拉取,模型不允许通过参数注入日志,
# 避免动辄数 KB 的日志在 LLM 上下文中重复流转造成响应缓慢。
logs = diagnostics.logs
# 5) 反馈提交前必须先发送预览并等待用户真实点击确认。确认 token 由
# prepare_feedback_issue 创建、按钮 callback 标记 confirmed模型
# 自行声称“用户已确认”不会通过这里。
draft_hash = build_feedback_draft_hash(
title=title,
version=version,
environment=environment,
issue_type=issue_type,
description=description,
original_user_request=original_user_request,
logs=logs,
diagnostics_id=diagnostics_id,
)
confirmation = feedback_issue_state_store.consume_confirmed(
confirmation_token,
session_id=self._session_id,
user_id=self._user_id,
draft_hash=draft_hash,
)
if not confirmation:
return self._result_payload(
success=False,
reason="confirmation_required",
message=(
"提交前必须先调用 prepare_feedback_issue 发送预览,并等待用户"
"点击确认按钮;当前 confirmation_token 无效、未确认或草稿"
"内容已被修改。"
),
)
# 6) Per-user rate limit30 分钟冷却 + 24h 配额 10 条。命中后**仍**
# 给 prefill_url避免误伤"短时间内确实有第二个真 bug 要报"的
# 场景——让管理员可以走浏览器手动提,但 Agent 不会代理刷上游。
rate_err = self._check_user_rate_limit(self._username or "")
if rate_err:
prefill_url = self._build_prefill_url(
title=title,
version=version,
environment=environment,
issue_type=issue_type,
description=description,
logs=logs,
)
pushed = await self._push_url_to_user(
url=prefill_url,
title="问题反馈 - 已达提交频率上限",
hint=rate_err + "\n\n如果确实是另一个真实问题,可点击下方链接到 GitHub 手动提交。",
)
logger.warning(
f"submit_feedback_issue 触发 rate limitusername={self._username!r}"
)
return self._result_payload(
success=False,
reason="rate_limited_user",
url_delivered=pushed,
prefill_url=None if pushed else prefill_url,
message=(
rate_err + " (已通过独立消息把手动提交的预填链接发给用户。)"
if pushed
else
rate_err + " (独立消息推送失败,请把 prefill_url 原样转给用户。)"
),
)
# 7) 同会话内 60 秒去重,防止 agent 多次触发提交同一问题
body_preview = self._build_issue_body(
version=version,
environment=environment,
@@ -445,7 +977,12 @@ class SubmitFeedbackIssueTool(MoviePilotTool):
),
)
# 4) 始终先生成兜底 URL无论后面走哪条路径都能用上
# 通过所有前置校验,记录一次「该管理员发起了一次提交」到 rate-limit
# 状态。**包括** no_token 兜底场景——避免管理员通过反复触发兜底来无
# 限次刷预填 URL 给自己。
self._record_user_submission(self._username or "")
# 8) 始终先生成兜底 URL无论后面走哪条路径都能用上
prefill_url = self._build_prefill_url(
title=title,
version=version,
@@ -455,7 +992,7 @@ class SubmitFeedbackIssueTool(MoviePilotTool):
logs=logs,
)
# 5) 没有 token 时直接降级到 URL 兜底
# 9) 没有 token 时直接降级到 URL 兜底
if not settings.GITHUB_TOKEN:
logger.warning(
"未配置 GITHUB_TOKENfeedback issue 降级到预填 URL 通道"
@@ -487,7 +1024,7 @@ class SubmitFeedbackIssueTool(MoviePilotTool):
),
)
# 6) 调 GitHub REST API。POST /issues 必须带 Bearer Token
# 10) 调 GitHub REST API。POST /issues 必须带 Bearer Token
# GITHUB_HEADERS 已经填好 Authorization & UA再补 Content-Type
# 与 Accept 以满足 GitHub 推荐头规范。复用 body_preview避免
# 重新构造一次_build_issue_body 已经做了脱敏与长度兜底)。
@@ -504,9 +1041,10 @@ class SubmitFeedbackIssueTool(MoviePilotTool):
"labels": ["bug"],
}
# 在真正发起 API 调用前先 record确保后续任何结果(成功 / 失败 /
# 网络异常)都会被纳入 60 秒去重窗口,避免 agent 因 LLM loop 在短
# 时间内反复触发提交。
# 在真正发起 API 调用前先 record 一次内容哈希,确保后续任何结果
# (成功 / 失败 / 网络异常)都会被纳入 60 秒去重窗口,避免 agent
# 因 LLM loop 或网络重试在短时间内反复触发提交。per-user rate-limit
# 状态已经在前置校验通过后记录,这里不再重复。
self._record_submission(title, body)
try:
@@ -589,9 +1127,11 @@ class SubmitFeedbackIssueTool(MoviePilotTool):
# send 失败才把 URL 退给 LLM 转述兜底
issue_url=None if pushed else html_url,
message=(
f"Issue 已成功提交{FEEDBACK_REPO}#{number},并通过独立"
"消息把链接推给用户,请在对话中简短告知用户提交成功并"
"请其等待 maintainer 回复。"
"Issue 已成功提交,并通过独立通知卡片把链接发给用户。"
"**本轮对话只允许输出一句中文简短确认**例如「Issue 已"
"提交,等待 maintainer 跟进。」——禁止重复 issue 编号 / "
"仓库名 / URL禁止说「提交链接已通过通知通道发送」"
"之类的实现细节。通知卡片已经把全部信息展示给用户。"
if pushed
else
f"Issue 已成功提交到 {FEEDBACK_REPO}#{number}"