refactor(agent): move feedback issue flow into skill scripts

This commit is contained in:
jxxghp
2026-05-21 19:22:27 +08:00
parent b6b5529d19
commit 737bcb5c62
16 changed files with 1683 additions and 4242 deletions

View File

@@ -92,25 +92,12 @@ class AskUserChoiceTool(MoviePilotTool):
def _blocked_by_feedback_quality_gate(self) -> bool:
"""反馈 Issue 质量门槛拒绝后,禁止继续发按钮引导改写。
这是对 ``feedback-issue`` skill 的工具层兜底:模型可能在
``submit_feedback_issue`` 返回 ``rejected_quality`` 后仍调用本工具,
试图让用户选择“提供真实问题描述重新提交”。这会把测试 / 占位内容
的拒绝结果变成绕过指导,因此同一轮 tool context 中直接拦截。
这是对 ``feedback-issue`` skill 的历史兜底:如果同一轮上下文已经
标记反馈内容被质量门槛拒绝,就不能再用按钮诱导用户把测试 / 占位
内容改写成“真实问题”。
"""
return bool(self._agent_context.get("feedback_issue_rejected_quality"))
def _blocked_by_pending_feedback_confirmation(self) -> bool:
"""已经发出 ``prepare_feedback_issue`` 的预览按钮后,禁止再叠一层选择。
Why: Issue #5807 实测中 deepseek 在 prepare 之后又自作主张调
``ask_user_choice``,给用户发了第二个「确认提交 ISSUE」按钮。
两条按钮 → 两次 callback → agent 走两轮 → 同一条成功文案被发 3 次。
从工具层硬拦:发现 ``reply_mode=feedback_issue_confirmation`` 直接拒绝。
"""
return (
self._agent_context.get("reply_mode") == "feedback_issue_confirmation"
)
async def run(
self,
message: str,
@@ -129,18 +116,6 @@ class AskUserChoiceTool(MoviePilotTool):
"请直接结束本次反馈流程。"
)
if self._blocked_by_pending_feedback_confirmation():
logger.warning(
"ask_user_choice blocked while feedback issue preview pending: "
"session_id=%s",
self._session_id,
)
return (
"prepare_feedback_issue 已经发出确认按钮并在等待用户点击,"
"不允许再叠加 ask_user_choice。请直接结束本轮等待用户在"
"现有按钮上点选。"
)
if not self._channel or not self._source:
return "当前不在可回传消息的会话中,无法发起按钮选择"

View File

@@ -1,453 +0,0 @@
"""收集反馈 Issue 提交前需要附带的本地诊断日志。"""
from __future__ import annotations
import json
import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool
from app.agent.tools.impl.feedback_issue_state import feedback_issue_state_store
from app.agent.tools.impl.submit_feedback_issue import SubmitFeedbackIssueTool
from app.core.config import settings
from app.log import logger
_MAX_READ_BYTES = 512 * 1024
_MAX_DIAGNOSTIC_LOG_CHARS = 6 * 1024
# 默认时间窗:仅收集最近 30 分钟的日志。
# Why: 用户说「今天 TMDB 一直在报错」时,期望看到的是这次会话前后真实
# 触发的报错,而不是几天前历史日志里所有出现 "TMDB" 的行。Issue #5806
# 实战中就发生了:关键词命中了几天前的测试日志,日志段完全对不上当前问题。
_DEFAULT_TIME_WINDOW_MINUTES = 30
_MIN_TIME_WINDOW_MINUTES = 5
_MAX_TIME_WINDOW_MINUTES = 24 * 60
# MoviePilot 主日志行首格式:``【LEVEL】YYYY-MM-DD HH:MM:SS,ms - module - msg``
# 用第一个时间戳判断行属于哪一刻;匹配不到时把行算到「无法判断时间」桶,
# 默认保留(行内可能是 Traceback 续行,不能丢)。
_LOG_TIMESTAMP_RE = re.compile(r"(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})")
_LOG_TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
# 提取日志行的源模块名,用于过滤"Agent 自身 meta-noise"。
_LOG_MODULE_RE = re.compile(
r"^【[^】]+】\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d+\s+-\s+([^\s][^\-]*?)\s+-\s+"
)
# 这些模块产出的日志属于 Agent 自身运行 / 框架内务,对用户面故障定位毫无
# 价值——反而经常把诊断段污染成"反馈流程的回声"tool args dump 里塞着
# ``database / 推荐 / 豆包`` 等关键字,让 keyword 过滤命中一堆 noise
# 真正的 RateLimitError / Traceback 反而被挤掉(参见 #5808 实战)。
#
# 包含两类:
# 1) 反馈流程自己的工具与框架(绝对要排除,否则永远在自我反射)
# 2) 通用 Agent 框架噪音tool dispatch / event bus / streaming callback /
# 通知发送 / activity log 等
_META_NOISE_MODULES = frozenset({
# 反馈流程
"collect_feedback_diagnostics.py",
"prepare_feedback_issue.py",
"submit_feedback_issue.py",
"ask_user_choice.py",
# Agent 框架
"base.py", # tool framework: Executing tool / Tool ... executed
"agent", # agent runtime: Agent推理 / 流式输出
"factory.py", # tool factory creation
"callback", # streaming callback
"prompt", # 提示词加载
"memory.py", # 会话记忆
"activity_log.py", # activity 日志
# 消息/事件总线(往往把 issue 预览全文 dump 进日志)
"message.py",
"event.py",
"chain", # chain - 请求系统模块执行xxx
# 渠道适配层噪音
"discord",
"telegram",
"telegram.py",
# 命令执行agent 自己跑过的 shell 命令 echo
"execute_command.py",
})
# 不允许使用的模糊关键词:通用到几乎每条 log 都会命中、对定位本次问题
# 没有价值。当 keyword 列表只剩这些时退回到「按时间窗口取尾部」。
_VAGUE_KEYWORDS = frozenset({
"错误", "异常", "失败", "error", "exception", "failed", "warn", "warning",
"日志", "问题", "bug", "log", "logs",
})
# 入口意图门:``original_user_request`` 里必须能同时命中"动作"+"目标"
# 工具才允许进入反馈流程。Agent 在用户随口提到「报错」「不工作」时自作
# 主张调用本工具,就会被这里硬挡住——把反馈通道留给真正想给上游提
# Issue 的请求。
#
# 当前威胁模型是「模型过度归因到 upstream bug」不是「对抗性绕过」
# 用户用近义词意图明显时(如「能不能给上游提 issue」SKILL.md 引导
# Agent 在原话里至少保留 ``反馈/提交/上游/issue`` 之一;如果保留不下来,
# Agent 应该回退到本地诊断而不是强行触发反馈。
#
# 第一组动作词(必须出现至少一个):
_FEEDBACK_VERB_PHRASES: tuple[str, ...] = (
"反馈", "提交", "上报", "汇报",
"提 issue", "提issue", "提 bug", "提bug",
"报 bug", "报bug", "报告 bug", "报告bug",
"新建 issue", "新建issue", "开 issue", "开issue",
"让上游", "给上游",
"file an issue", "report a bug", "open an upstream issue",
"submit an issue", "raise an issue", "report this upstream",
"report upstream",
)
# 第二组目标词(动作命中后再校验目标存在):英文 phrase 自带目标可绕过这里。
_FEEDBACK_TARGET_TOKENS: tuple[str, ...] = (
"issue", "bug", "问题", "错误报告",
"上游", "mp", "moviepilot",
)
# 自带目标语义的完整短语:命中后直接放行,不再校验目标词。
_FEEDBACK_STANDALONE_PHRASES: tuple[str, ...] = (
"file an issue", "report a bug", "open an upstream issue",
"submit an issue", "raise an issue", "report this upstream",
"report upstream",
"新建 issue", "新建issue", "开 issue", "开issue",
"提 issue", "提issue", "提 bug", "提bug",
"报 bug", "报bug", "报告 bug", "报告bug",
"让上游", "给上游",
)
# 中文里常见"动词 + 量词/介词 + 目标"模式,用正则承接(最多容忍 6 字符
# 间隔,覆盖"给 MP 提个 bug"、"反馈这个问题"、"报告一个 issue"
_FEEDBACK_REGEX_PATTERNS: tuple[re.Pattern, ...] = (
re.compile(r"提.{0,6}(bug|issue|问题|错误报告)", re.IGNORECASE),
re.compile(r"报.{0,6}(bug|issue|错误报告)", re.IGNORECASE),
re.compile(r"反馈.{0,8}(issue|bug|问题|上游|错误)", re.IGNORECASE),
re.compile(r"开.{0,4}(issue|bug)", re.IGNORECASE),
re.compile(r"上报.{0,6}(bug|issue|问题|错误)", re.IGNORECASE),
)
class CollectFeedbackDiagnosticsInput(BaseModel):
"""反馈诊断日志收集工具输入。"""
explanation: str = Field(
...,
description="Clear explanation of why diagnostic logs are being collected before filing feedback",
)
original_user_request: str = Field(
...,
description="The user's original bug report text that triggered diagnostics collection",
)
keywords: Optional[list[str]] = Field(
default=None,
description=(
"Short keywords to filter logs. Should be SPECIFIC tokens: media title, "
"plugin id, exception class name, downloader name, etc. Vague terms like "
"'错误'/'异常'/'失败'/'error' are ignored because they match almost every log line."
),
)
max_lines: int = Field(
default=80,
description="Maximum matched log lines to return; default 80",
)
time_window_minutes: int = Field(
default=_DEFAULT_TIME_WINDOW_MINUTES,
description=(
"Only include log lines whose timestamp falls within the last N minutes "
"(default 30, range 5-1440). Older lines are dropped regardless of keyword "
"match so the diagnostic snapshot reflects the current incident, not "
"historical noise."
),
)
class CollectFeedbackDiagnosticsTool(MoviePilotTool):
"""收集并缓存反馈 Issue 用的日志片段。"""
name: str = "collect_feedback_diagnostics"
description: str = (
"Collect recent local MoviePilot logs before preparing or submitting a feedback issue. "
"This tool reads config/logs/moviepilot.log and plugin logs, filters by user-provided "
"keywords when available, redacts common secrets, and stores a diagnostics_id that "
"submit_feedback_issue requires. Use it before prepare_feedback_issue."
)
args_schema: Type[BaseModel] = CollectFeedbackDiagnosticsInput
require_admin: bool = True
def get_tool_message(self, **kwargs) -> Optional[str]:
"""侧边消息:告知用户正在读取本地日志辅助反馈。"""
return "收集反馈诊断日志"
@staticmethod
def _read_tail(path: Path) -> str:
"""读取日志文件尾部,避免大日志一次性进入内存。"""
try:
size = path.stat().st_size
with path.open("rb") as file_obj:
if size > _MAX_READ_BYTES:
file_obj.seek(size - _MAX_READ_BYTES)
return file_obj.read().decode("utf-8", errors="replace")
except OSError as err:
logger.debug("读取反馈诊断日志失败: %s %s", path, err)
return ""
@staticmethod
def _candidate_log_files() -> list[Path]:
"""返回反馈诊断可读取的日志文件列表。"""
files = [settings.LOG_PATH / "moviepilot.log"]
plugin_log_dir = settings.LOG_PATH / "plugins"
if plugin_log_dir.exists():
files.extend(sorted(plugin_log_dir.rglob("*.log")))
return [path for path in files if path.exists() and path.is_file()]
@staticmethod
def _normalize_keywords(
original_user_request: str,
keywords: Optional[list[str]],
) -> list[str]:
"""合并用户原话和显式关键词,生成保守的日志过滤词。
Issue #5806 教训:把 "错误 / 异常 / 失败 / TMDB" 这种通用词当关键词
会让几乎所有日志行命中,过滤等于没过滤。这里只保留**显式且足够具体**
≥2 字符且不在 ``_VAGUE_KEYWORDS`` 里)的关键词。"""
normalized: list[str] = []
for item in keywords or []:
item = str(item or "").strip()
if len(item) < 2:
continue
if item.lower() in _VAGUE_KEYWORDS:
continue
if item not in normalized:
normalized.append(item)
return normalized
@staticmethod
def _has_explicit_feedback_intent(original_user_request: str) -> bool:
"""判断用户原话里是否出现了"明确要求提 Issue"的意图。
Why: Agent 在 deepseek 这类强模型里会主动归因——用户只说"TMDB 报
""下载没动"Agent 就跳过本地诊断、直接进入反馈流程。本工具
是反馈流程的入口,硬挡一道意图门,迫使 Agent 回到 SKILL.md Step 0
要求的"先排查、再反馈"路径。
判定规则(先放行更具体的、再回落到组合):
1. 命中 ``_FEEDBACK_STANDALONE_PHRASES`` 任一短语 → 放行。
这些短语已经把"动作 + 目标"打包在一起(如 ``提 issue``、
``file an issue``),无需再二次校验。
2. 同时命中一个 ``_FEEDBACK_VERB_PHRASES`` 动作词和一个
``_FEEDBACK_TARGET_TOKENS`` 目标词 → 放行。能覆盖"反馈这个
问题""提交个 bug""把这个反馈给上游"等自然中文。
3. 否则视为没有明确意图,拒绝。
"""
if not original_user_request:
return False
normalized = original_user_request.lower().strip()
if any(phrase in normalized for phrase in _FEEDBACK_STANDALONE_PHRASES):
return True
if any(p.search(normalized) for p in _FEEDBACK_REGEX_PATTERNS):
return True
has_verb = any(phrase in normalized for phrase in _FEEDBACK_VERB_PHRASES)
has_target = any(token in normalized for token in _FEEDBACK_TARGET_TOKENS)
return has_verb and has_target
@staticmethod
def _normalize_window(time_window_minutes: int) -> int:
"""把传入的时间窗 clamp 到 [5, 1440] 区间。"""
try:
window = int(time_window_minutes or _DEFAULT_TIME_WINDOW_MINUTES)
except (TypeError, ValueError):
window = _DEFAULT_TIME_WINDOW_MINUTES
return max(_MIN_TIME_WINDOW_MINUTES, min(_MAX_TIME_WINDOW_MINUTES, window))
@staticmethod
def _parse_line_timestamp(line: str) -> Optional[datetime]:
"""从一行日志开头提取时间戳;提取不到返回 None。"""
match = _LOG_TIMESTAMP_RE.search(line[:64])
if not match:
return None
try:
return datetime.strptime(match.group(1), _LOG_TIMESTAMP_FORMAT)
except ValueError:
return None
@staticmethod
def _is_meta_noise(line: str) -> bool:
"""判断一行日志是否来自"Agent 自身 meta-noise"模块。
命中即排除。续行(无模块名)由调用方按"跟随父行"语义处理。
"""
match = _LOG_MODULE_RE.match(line)
if not match:
return False
return match.group(1).strip() in _META_NOISE_MODULES
@classmethod
def _filter_lines(
cls,
text: str,
keywords: list[str],
max_lines: int,
window_start: datetime,
) -> list[str]:
"""按时间窗 + 关键词筛日志。
- 行能解析到时间戳:在 ``window_start`` 之前的丢弃;之后的进入候选。
- 行解析不到时间戳Traceback 续行等):跟随**最近一条已知时间戳行**
的归属,没有上下文时按"近期"对待,避免把异常堆栈截断。
- 在候选行里再按关键词过滤;无关键词或全部行都不命中时退回到时间
窗内的尾部行,保证返回有意义的内容而不是空集。
"""
candidates: list[str] = []
last_seen_in_window: Optional[bool] = None
last_seen_was_meta: bool = False
for line in text.splitlines():
if not line.strip():
continue
ts = cls._parse_line_timestamp(line)
if ts is not None:
in_window = ts >= window_start
# Meta-noise 行agent/tool framework 自己的日志)即便落在窗口
# 内也直接丢;它们对用户面故障定位没有价值,反而会因为带有
# ``database / 推荐 / 豆包`` 之类关键字让诊断段灌满 noise。
is_meta = cls._is_meta_noise(line)
last_seen_was_meta = is_meta
last_seen_in_window = in_window and not is_meta
if in_window and not is_meta:
candidates.append(line)
else:
# 续行跟随上一条时间戳行的去留meta-noise 父行的续行也丢)
if last_seen_in_window and not last_seen_was_meta:
candidates.append(line)
if not candidates:
return []
if keywords:
lowered_keywords = [item.lower() for item in keywords]
# 关键字过滤需要按"时间戳行块"为单位:命中的 ERROR 行带着它的
# Traceback 续行一起保留,避免把异常堆栈截掉一半反而更难定位。
matched: list[str] = []
keep_block = False
for line in candidates:
has_ts = cls._parse_line_timestamp(line) is not None
if has_ts:
keep_block = any(kw in line.lower() for kw in lowered_keywords)
if keep_block:
matched.append(line)
elif keep_block:
matched.append(line)
if matched:
return matched[-max_lines:]
return candidates[-max_lines:]
async def run(
self,
original_user_request: str,
keywords: Optional[list[str]] = None,
max_lines: int = 80,
time_window_minutes: int = _DEFAULT_TIME_WINDOW_MINUTES,
**kwargs,
) -> str:
"""读取、筛选、脱敏并缓存本次反馈相关日志。
Issue #5806 暴露的两个数据准确性问题在这里一并修:
1. 时间窗:默认只看最近 30 分钟,杜绝历史无关日志混入。
2. 关键词过滤收紧:剔除"错误/异常/失败"等几乎每行都命中的通用词。
反馈入口意图门(用户反馈):``original_user_request`` 里必须有
明确"我要提 Issue / 反馈 issue / file an issue"之类的短语;
Agent 自作主张把"TMDB 报错"理解成"反馈" 时直接拒绝,引导回归
本地诊断路径,避免给上游刷 Issue。
"""
if not self._has_explicit_feedback_intent(original_user_request):
logger.info(
"collect_feedback_diagnostics 拒绝:原始请求里没有明确"
"反馈意图。原话=%r",
(original_user_request or "")[:120],
)
return json.dumps(
{
"success": False,
"reason": "no_explicit_feedback_intent",
"message": (
"用户原话里没有明确要求向上游反馈 Issue 的短语,"
"不应直接进入反馈流程。请回到常规诊断路径,使用"
"query_subscribes / query_download_tasks / "
"query_logs / test_site 等工具先排查;仅当用户"
"在排查后明确要求把问题转给上游(例如说出 "
"「反馈 issue / 提 issue / 报 bug / 让上游修一下」"
"之类的原话),才能再次调用本工具。"
),
},
ensure_ascii=False,
indent=2,
)
try:
normalized_max_lines = min(max(int(max_lines or 80), 20), 200)
except (TypeError, ValueError):
normalized_max_lines = 80
window_minutes = self._normalize_window(time_window_minutes)
window_start = datetime.now() - timedelta(minutes=window_minutes)
normalized_keywords = self._normalize_keywords(original_user_request, keywords)
collected: list[str] = []
source_files: list[str] = []
log_files = await self.run_blocking("default", self._candidate_log_files)
for path in log_files:
text = await self.run_blocking("default", self._read_tail, path)
if not text:
continue
lines = self._filter_lines(
text, normalized_keywords, normalized_max_lines, window_start
)
if not lines:
continue
source_files.append(str(path))
collected.append(f"### {path.name}\n" + "\n".join(lines))
raw_logs = "\n\n".join(collected)
logs = SubmitFeedbackIssueTool._sanitize_logs(raw_logs, _MAX_DIAGNOSTIC_LOG_CHARS)
found = bool(logs.strip())
record = feedback_issue_state_store.create_diagnostics(
session_id=self._session_id,
user_id=self._user_id,
username=self._username,
logs=logs,
source_files=source_files,
found=found,
)
self._agent_context["feedback_issue_diagnostics_id"] = record.diagnostics_id
# 关键:不要把 ``logs`` 内容回传给 LLM。日志可达 6KB回传后 LLM
# 还会在下一步把它原样塞进 prepare_feedback_issue 的入参里二次
# transit导致 26B/V3 等模型每轮要 ingest+emit 数 KB 文本,响应延
# 迟从秒级飙到分钟级(曾观察到 collect 返回 7.7KB → 下一轮 prepare
# 入参 logs 字段又重复一份)。日志全程只通过 ``diagnostics_id``
# 在服务端的 ``feedback_issue_state_store`` 流转,模型只看到摘要。
log_bytes = len(record.logs.encode("utf-8", errors="replace"))
log_lines = len(record.logs.splitlines()) if record.logs else 0
return json.dumps(
{
"success": True,
"diagnostics_id": record.diagnostics_id,
"found": record.found,
"source_files": record.source_files,
"log_bytes": log_bytes,
"log_lines": log_lines,
"message": (
"已收集并缓存反馈诊断日志。"
if found
else "已完成诊断日志收集,但未找到明显相关日志。"
) + (
"日志已通过 diagnostics_id 缓存在服务端,"
"后续 prepare_feedback_issue / submit_feedback_issue "
"只需传入 diagnostics_id**不要**再把日志正文当参数传回。"
),
},
ensure_ascii=False,
indent=2,
)

View File

@@ -1,261 +0,0 @@
"""反馈 Issue 流程的短期服务端状态。
这里保存两类只应由工具写入的状态:
- 诊断日志收集结果:证明 Agent 在提交前尝试读取过本地日志。
- 用户确认结果:证明用户通过按钮确认过某份预览草稿。
状态只保存在当前进程内,重启后失效;这符合反馈提交这种交互式流程的预期,
也避免把一次性确认 token 持久化到数据库。
"""
from __future__ import annotations
import hashlib
import time
import uuid
from dataclasses import dataclass
from threading import Lock
from typing import Optional
FEEDBACK_CONFIRM_VALUE_PREFIX = "__feedback_issue_confirm__:"
_STATE_TTL_SECONDS = 60 * 60
@dataclass
class FeedbackDiagnosticsRecord:
"""一次反馈诊断日志收集结果。"""
diagnostics_id: str
session_id: str
user_id: str
username: Optional[str]
logs: str
source_files: list[str]
found: bool
created_at: float
@dataclass
class FeedbackConfirmationRecord:
"""一次反馈 Issue 预览确认状态。"""
confirmation_token: str
session_id: str
user_id: str
username: Optional[str]
draft_hash: str
diagnostics_id: str
created_at: float
confirmed_at: Optional[float] = None
def build_feedback_draft_hash(
*,
title: str,
version: str,
environment: str,
issue_type: str,
description: str,
original_user_request: str,
logs: Optional[str],
diagnostics_id: str,
) -> str:
"""为用户确认的 Issue 草稿生成稳定摘要。"""
parts = (
title.strip(),
version.strip(),
environment.strip(),
issue_type.strip(),
description.strip(),
original_user_request.strip(),
(logs or "").strip(),
diagnostics_id.strip(),
)
return hashlib.sha256("\x00".join(parts).encode("utf-8", errors="replace")).hexdigest()
class FeedbackIssueStateStore:
"""管理反馈 Issue 流程的进程内短期状态。"""
def __init__(self) -> None:
self._diagnostics: dict[str, FeedbackDiagnosticsRecord] = {}
self._confirmations: dict[str, FeedbackConfirmationRecord] = {}
self._lock = Lock()
def _cleanup_locked(self) -> None:
expire_before = time.time() - _STATE_TTL_SECONDS
for diagnostics_id, record in list(self._diagnostics.items()):
if record.created_at < expire_before:
self._diagnostics.pop(diagnostics_id, None)
for token, record in list(self._confirmations.items()):
if record.created_at < expire_before:
self._confirmations.pop(token, None)
def create_diagnostics(
self,
*,
session_id: str,
user_id: str,
username: Optional[str],
logs: str,
source_files: list[str],
found: bool,
) -> FeedbackDiagnosticsRecord:
"""登记一次日志收集结果。"""
with self._lock:
self._cleanup_locked()
diagnostics_id = uuid.uuid4().hex[:12]
while diagnostics_id in self._diagnostics:
diagnostics_id = uuid.uuid4().hex[:12]
record = FeedbackDiagnosticsRecord(
diagnostics_id=diagnostics_id,
session_id=session_id,
user_id=str(user_id),
username=username,
logs=logs,
source_files=source_files,
found=found,
created_at=time.time(),
)
self._diagnostics[diagnostics_id] = record
return record
def get_diagnostics(
self,
diagnostics_id: str,
*,
session_id: str,
user_id: str,
) -> Optional[FeedbackDiagnosticsRecord]:
"""按会话和用户读取诊断结果,防止跨用户复用。"""
with self._lock:
self._cleanup_locked()
record = self._diagnostics.get(diagnostics_id)
if not record:
return None
if record.session_id != session_id or record.user_id != str(user_id):
return None
return record
def find_active_confirmation(
self,
*,
session_id: str,
user_id: str,
) -> Optional[FeedbackConfirmationRecord]:
"""查找当前会话/用户尚未消费、且未点击确认的预览 token。
prepare_feedback_issue 会用它判断「上一份预览还挂着,不该再发一份」,
避免 #5806 实测里发了两次同样的确认按钮、用户点了两次的情况。"""
with self._lock:
self._cleanup_locked()
for record in self._confirmations.values():
if (
record.session_id == session_id
and record.user_id == str(user_id)
and record.confirmed_at is None
):
return record
return None
def invalidate_active_confirmations(
self,
*,
session_id: str,
user_id: str,
) -> int:
"""作废当前会话所有未确认的预览 token返回作废数量。
用户在 prepare 之后修改草稿、重新调 prepare 时调用;旧 token 失效
后即便残留消息里的按钮被点击,``mark_confirmed`` 也会因找不到记录
而返回 False避免脏数据驱动提交。"""
with self._lock:
self._cleanup_locked()
to_drop = [
token
for token, record in self._confirmations.items()
if record.session_id == session_id
and record.user_id == str(user_id)
and record.confirmed_at is None
]
for token in to_drop:
self._confirmations.pop(token, None)
return len(to_drop)
def create_confirmation(
self,
*,
session_id: str,
user_id: str,
username: Optional[str],
draft_hash: str,
diagnostics_id: str,
) -> FeedbackConfirmationRecord:
"""创建待用户点击确认的草稿 token。"""
with self._lock:
self._cleanup_locked()
token = uuid.uuid4().hex
while token in self._confirmations:
token = uuid.uuid4().hex
record = FeedbackConfirmationRecord(
confirmation_token=token,
session_id=session_id,
user_id=str(user_id),
username=username,
draft_hash=draft_hash,
diagnostics_id=diagnostics_id,
created_at=time.time(),
)
self._confirmations[token] = record
return record
def mark_confirmed(
self,
token: str,
*,
session_id: str,
user_id: str,
) -> bool:
"""按钮回调命中时,把 token 标记为已由用户确认。"""
with self._lock:
self._cleanup_locked()
record = self._confirmations.get(token)
if not record:
return False
if record.session_id != session_id or record.user_id != str(user_id):
return False
record.confirmed_at = time.time()
return True
def consume_confirmed(
self,
token: str,
*,
session_id: str,
user_id: str,
draft_hash: str,
) -> Optional[FeedbackConfirmationRecord]:
"""消费一次已确认 token内容摘要不一致时拒绝。"""
with self._lock:
self._cleanup_locked()
record = self._confirmations.get(token)
if not record:
return None
if (
record.session_id != session_id
or record.user_id != str(user_id)
or record.draft_hash != draft_hash
or record.confirmed_at is None
):
return None
return self._confirmations.pop(token, None)
def clear(self) -> None:
"""测试和重置场景使用:清空所有短期状态。"""
with self._lock:
self._diagnostics.clear()
self._confirmations.clear()
feedback_issue_state_store = FeedbackIssueStateStore()

View File

@@ -1,285 +0,0 @@
"""生成反馈 Issue 预览并要求用户按钮确认。"""
from __future__ import annotations
import json
from typing import Optional, Type
from pydantic import BaseModel, Field
from app.agent.tools.base import MoviePilotTool, ToolChain
from app.agent.tools.impl.feedback_issue_state import (
FEEDBACK_CONFIRM_VALUE_PREFIX,
build_feedback_draft_hash,
feedback_issue_state_store,
)
from app.agent.tools.impl.submit_feedback_issue import (
ALLOWED_ENVIRONMENTS,
ALLOWED_ISSUE_TYPES,
MAX_TITLE_CHARS,
SubmitFeedbackIssueTool,
)
from app.helper.interaction import AgentInteractionOption, agent_interaction_manager
from app.log import logger
from app.schemas import Notification, NotificationType
from app.schemas.message import ChannelCapabilityManager
from app.schemas.types import MessageChannel
class PrepareFeedbackIssueInput(BaseModel):
"""反馈 Issue 预览确认工具输入。"""
explanation: str = Field(
...,
description="Clear explanation of why a feedback issue preview is being prepared",
)
title: str = Field(..., description="Issue title following `[错误报告]: <短描述>`")
version: str = Field(..., description="Current MoviePilot version")
environment: str = Field(..., description="Exactly Docker or Windows")
issue_type: str = Field(..., description="主程序运行问题 / 插件问题 / 其他问题")
description: str = Field(..., description="Structured issue description")
original_user_request: str = Field(..., description="Verbatim original user request")
diagnostics_id: str = Field(
...,
description=(
"diagnostics_id returned by collect_feedback_diagnostics. Logs are loaded from "
"the server-side state store via this id — do NOT pass the log text itself."
),
)
class PrepareFeedbackIssueTool(MoviePilotTool):
"""发送 Issue 草稿预览,并创建只能由按钮回调确认的 token。"""
name: str = "prepare_feedback_issue"
sends_message: bool = True
description: str = (
"Prepare a feedback issue preview and ask the user to confirm via buttons. "
"Must be called after collect_feedback_diagnostics and before submit_feedback_issue. "
"Returns a confirmation_token, but submit_feedback_issue will only accept it after "
"the user actually clicks the confirmation button."
)
args_schema: Type[BaseModel] = PrepareFeedbackIssueInput
require_admin: bool = True
def get_tool_message(self, **kwargs) -> Optional[str]:
"""侧边消息:告知用户正在生成提交预览。"""
return "生成问题反馈预览并等待确认"
@staticmethod
def _truncate_button_text(text: str, max_length: int) -> str:
"""按渠道限制裁剪按钮文案。"""
if max_length <= 0 or len(text) <= max_length:
return text
if max_length <= 3:
return text[:max_length]
return text[: max_length - 3] + "..."
@staticmethod
def _result_payload(**fields) -> str:
"""统一 JSON 返回,便于 Agent 按字段继续下一步。"""
return json.dumps(fields, ensure_ascii=False, indent=2)
async def run(
self,
title: str,
version: str,
environment: str,
issue_type: str,
description: str,
original_user_request: str,
diagnostics_id: str,
**kwargs,
) -> str:
"""校验草稿、发送预览按钮,并缓存待确认 token。"""
if not self._channel or not self._source:
return self._result_payload(
success=False,
reason="no_channel",
message="当前不在可回传消息的会话中,无法发送 Issue 预览确认按钮。",
)
try:
channel = MessageChannel(self._channel)
except ValueError:
return self._result_payload(
success=False,
reason="unsupported_channel",
message=f"不支持的消息渠道: {self._channel}",
)
if not (
ChannelCapabilityManager.supports_buttons(channel)
and ChannelCapabilityManager.supports_callbacks(channel)
):
return self._result_payload(
success=False,
reason="buttons_unsupported",
message=f"当前渠道 {channel.value} 不支持按钮确认,不能自动提交反馈 Issue。",
)
diagnostics = feedback_issue_state_store.get_diagnostics(
diagnostics_id,
session_id=self._session_id,
user_id=self._user_id,
)
if not diagnostics:
return self._result_payload(
success=False,
reason="diagnostics_missing",
message="缺少有效的诊断日志收集记录,请先调用 collect_feedback_diagnostics。",
)
# 日志全程只从服务端 state store 流转,避免日志在 LLM 上下文里反复
# 进出造成响应延迟(见 collect_feedback_diagnostics 中的设计注释)。
logs = diagnostics.logs
for value, allowed, field_name in (
(environment, ALLOWED_ENVIRONMENTS, "environment"),
(issue_type, ALLOWED_ISSUE_TYPES, "issue_type"),
):
err = SubmitFeedbackIssueTool._validate_enum(value, allowed, field_name)
if err:
return self._result_payload(success=False, reason="invalid_input", message=err)
title = SubmitFeedbackIssueTool._truncate(title, MAX_TITLE_CHARS, marker="")
quality_err = SubmitFeedbackIssueTool._check_content_quality(
title=title,
description=description,
original_user_request=original_user_request,
)
if quality_err:
self._agent_context["feedback_issue_rejected_quality"] = True
self._agent_context["feedback_issue_rejected_quality_reason"] = quality_err
return self._result_payload(
success=False,
reason="rejected_quality",
message=quality_err,
)
draft_hash = build_feedback_draft_hash(
title=title,
version=version,
environment=environment,
issue_type=issue_type,
description=description,
original_user_request=original_user_request,
logs=logs,
diagnostics_id=diagnostics_id,
)
# 同会话/用户已经发过预览且尚未被用户点击确认:拒绝重复发预览。
# Why: Issue #5806 实测中 agent 在一次用户输入里连续调用了两次
# prepare_feedback_issue导致 TG 里出现两份「确认提交」按钮,用户
# 点击两次后才进入提交。这里直接挡住重复预览:草稿一致就复用旧
# token草稿变了则要求 Agent 自己撤销旧 token 再发新预览(以免
# 残留按钮指向过期内容)。
active = feedback_issue_state_store.find_active_confirmation(
session_id=self._session_id,
user_id=self._user_id,
)
if active is not None:
if active.draft_hash == draft_hash:
logger.info(
"feedback issue preview deduped: session_id=%s reuse token=%s",
self._session_id,
active.confirmation_token[:8],
)
self._agent_context["user_reply_sent"] = True
self._agent_context["reply_mode"] = "feedback_issue_confirmation"
return self._result_payload(
success=True,
deduped=True,
confirmation_token=active.confirmation_token,
diagnostics_id=diagnostics_id,
message=(
"上一份相同内容的反馈预览仍在等待用户点击确认,"
"未重复发送按钮。请勿再次调用 prepare_feedback_issue。"
),
)
logger.info(
"feedback issue preview superseded: session_id=%s drop_token=%s",
self._session_id,
active.confirmation_token[:8],
)
feedback_issue_state_store.invalidate_active_confirmations(
session_id=self._session_id,
user_id=self._user_id,
)
confirmation = feedback_issue_state_store.create_confirmation(
session_id=self._session_id,
user_id=self._user_id,
username=self._username,
draft_hash=draft_hash,
diagnostics_id=diagnostics_id,
)
option_value = f"{FEEDBACK_CONFIRM_VALUE_PREFIX}{confirmation.confirmation_token}"
request = agent_interaction_manager.create_request(
session_id=self._session_id,
user_id=str(self._user_id),
channel=channel.value,
source=self._source,
username=self._username,
title="确认提交问题反馈",
prompt="请确认是否将以下问题反馈提交到 MoviePilot 上游仓库。",
options=[
AgentInteractionOption(label="确认提交", value=option_value),
AgentInteractionOption(label="取消提交", value="取消提交问题反馈"),
],
)
max_text_length = ChannelCapabilityManager.get_max_button_text_length(channel)
buttons = [
[
{
"text": self._truncate_button_text("确认提交", max_text_length),
"callback_data": f"agent_interaction:choice:{request.request_id}:1",
}
],
[
{
"text": self._truncate_button_text("取消提交", max_text_length),
"callback_data": f"agent_interaction:choice:{request.request_id}:2",
}
],
]
preview = (
"请确认是否提交以下问题反馈:\n\n"
f"标题:{title}\n"
f"版本:{version}\n"
f"环境:{environment}\n"
f"类型:{issue_type}\n"
f"诊断日志:{'已找到相关日志' if diagnostics.found else '未找到明确相关日志'}\n\n"
f"{description.strip()[:1800]}"
)
await ToolChain().async_post_message(
Notification(
channel=channel,
source=self._source,
mtype=NotificationType.Agent,
userid=self._user_id,
username=self._username,
title="确认提交问题反馈",
text=preview,
buttons=buttons,
)
)
logger.info(
"feedback issue preview sent: session_id=%s diagnostics_id=%s token=%s",
self._session_id,
diagnostics_id,
confirmation.confirmation_token[:8],
)
self._agent_context["user_reply_sent"] = True
self._agent_context["reply_mode"] = "feedback_issue_confirmation"
return self._result_payload(
success=True,
confirmation_token=confirmation.confirmation_token,
diagnostics_id=diagnostics_id,
message=(
"已通过独立通知卡片发送 Issue 预览和「确认提交 / 取消提交」"
"按钮给用户。**本轮对话不要再生成任何额外文字回复**——按钮"
"卡片已经完整表达了 Issue 草稿和操作引导,复述「已生成 "
"Issue 预览,请点击确认按钮」会和卡片重复并让用户困惑。"
"请直接结束本轮,等待用户点击按钮触发下一轮。"
),
)

File diff suppressed because it is too large Load Diff