fix(agent): 引入 HTML 转义与 UA 伪装,修复日志回传中的 Markdown 解析冲突 (v3.0.2-final)

This commit is contained in:
hotyue
2026-04-10 16:25:04 +00:00
parent 26fdb8f899
commit 4a77cb66d4

View File

@@ -62,6 +62,7 @@ import socketserver
import subprocess
import sys
import os
import html # [v3.0.2+ 修复] 用于安全转义日志中的特殊字符
PORT = int(sys.argv[1])
@@ -121,19 +122,17 @@ class AgentHandler(http.server.BaseHTTPRequestHandler):
self.wfile.write(b"Action Accepted: tg_report\n")
subprocess.Popen(['bash', '/opt/ip_sentinel/core/tg_report.sh'])
# 路由 4: 抓取并回传实时日志 (v3.0.2 RCE 防御重构)
# 路由 4: 抓取并回传实时日志 (v3.0.2 鲁棒性增强版)
elif self.path.startswith('/trigger_log'):
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"Action Accepted: fetch_log\n")
# 🛡️ 弃用高危 Bash 拼接,改用纯 Python 安全实现
import urllib.request
import urllib.parse
try:
# 1. 安全读取配置项 (不执行 source)
config = {}
if os.path.exists('/opt/ip_sentinel/config.conf'):
with open('/opt/ip_sentinel/config.conf', 'r') as f:
@@ -143,32 +142,37 @@ class AgentHandler(http.server.BaseHTTPRequestHandler):
key, val = line.split('=', 1)
config[key] = val.strip('"\'')
# 2. 安全截取日志最后15行
# 🛡️ 核心修复HTML 转义防止 Telegram 报错
log_data = "日志文件不存在或为空"
log_path = '/opt/ip_sentinel/logs/sentinel.log'
if os.path.exists(log_path):
with open(log_path, 'r', errors='ignore') as f:
lines = f.readlines()
if lines:
log_data = "".join(lines[-15:])
# 抓取最后 15 行并进行转义,确保 [ ] & < > 不会破坏消息
log_data = html.escape("".join(lines[-15:]))
# 3. 安全获取主机名
node_name = subprocess.check_output(['hostname']).decode('utf-8').strip()[:15]
# 4. 构建并发送请求
text_msg = f"📄 **[{node_name}] 实时运行日志:**\n```log\n{log_data}\n```"
# 🛡️ 核心修复:使用 HTML 模式,日志显示更整齐且稳定
text_msg = f"📄 <b>[{node_name}] 实时运行日志:</b>\n<pre><code>{log_data}</code></pre>"
data = urllib.parse.urlencode({
'chat_id': config.get('CHAT_ID', ''),
'text': text_msg,
'parse_mode': 'Markdown'
'parse_mode': 'HTML'
}).encode('utf-8')
req = urllib.request.Request(config.get('TG_API_URL', ''), data=data)
# 🛡️ 核心修复:补全 UA 头,通过安全网关校验
req = urllib.request.Request(
config.get('TG_API_URL', ''),
data=data,
headers={'User-Agent': 'IP-Sentinel-Agent/3.0.2'}
)
urllib.request.urlopen(req, timeout=10)
except Exception as e:
# 仅在本地静默打印异常,防止信息泄露
print(f"Log fetch error: {e}")
# 发生错误时在本地打印,便于长官排查
print(f"Log transmission failed: {e}")
else:
self.send_response(404)