security(agent): 纯 Python 重构彻底消除 Bash RCE 隐患,引入全局鉴权拦截未授权扫描与 DDoS (v3.0.2)

This commit is contained in:
hotyue
2026-04-10 16:03:31 +00:00
parent 5635c5d851
commit 26fdb8f899

View File

@@ -65,10 +65,28 @@ import os
PORT = int(sys.argv[1])
# 🛡️ [v3.0.2 紧急加固] 提取全局鉴权 Token (利用 CHAT_ID 作为 PSK 预共享密钥)
AUTH_TOKEN = ""
if os.path.exists('/opt/ip_sentinel/config.conf'):
with open('/opt/ip_sentinel/config.conf', 'r') as f:
for line in f:
line = line.strip()
if line.startswith('CHAT_ID='):
AUTH_TOKEN = line.split('=', 1)[1].strip('"\'')
break
class AgentHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
# 路由 1: Google 区域纠偏 (含老版 run 指令兼容)
if self.path == '/trigger_google' or self.path == '/trigger_run':
# 🛡️ 鉴权拦截器:防非法扫描与 DDoS 资源耗尽
if AUTH_TOKEN and f"auth={AUTH_TOKEN}" not in self.path:
self.send_response(401)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"401 Unauthorized: Access Denied\n")
return
# 路由 1: Google 区域纠偏 (由于 URL 带有 auth 参数,必须由 == 改为 startswith)
if self.path.startswith('/trigger_google') or self.path.startswith('/trigger_run'):
if os.path.exists('/opt/ip_sentinel/core/mod_google.sh'):
self.send_response(200)
self.send_header("Content-type", "text/plain")
@@ -82,7 +100,7 @@ class AgentHandler(http.server.BaseHTTPRequestHandler):
self.wfile.write(b"403 Forbidden: Google Module Disabled\n")
# 路由 2: IP 信用净化
elif self.path == '/trigger_trust':
elif self.path.startswith('/trigger_trust'):
if os.path.exists('/opt/ip_sentinel/core/mod_trust.sh'):
self.send_response(200)
self.send_header("Content-type", "text/plain")
@@ -96,29 +114,61 @@ class AgentHandler(http.server.BaseHTTPRequestHandler):
self.wfile.write(b"403 Forbidden: Trust Module Disabled\n")
# 路由 3: 触发战报推送
elif self.path == '/trigger_report':
elif self.path.startswith('/trigger_report'):
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"Action Accepted: tg_report\n")
subprocess.Popen(['bash', '/opt/ip_sentinel/core/tg_report.sh'])
# 路由 4: 抓取并回传实时日志
elif self.path == '/trigger_log':
# 路由 4: 抓取并回传实时日志 (v3.0.2 RCE 防御重构)
elif self.path.startswith('/trigger_log'):
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"Action Accepted: fetch_log\n")
bash_cmd = """
source /opt/ip_sentinel/config.conf
LOG_DATA=$(tail -n 15 /opt/ip_sentinel/logs/sentinel.log)
NODE=$(hostname | cut -c 1-15)
curl -s -X POST "${TG_API_URL}" \
-d "chat_id=${CHAT_ID}" \
-d "text=📄 **[${NODE}] 实时运行日志:**%0A\`\`\`log%0A${LOG_DATA}%0A\`\`\`" \
-d "parse_mode=Markdown"
"""
subprocess.Popen(['bash', '-c', bash_cmd])
# 🛡️ 弃用高危 Bash 拼接,改用纯 Python 安全实现
import urllib.request
import urllib.parse
try:
# 1. 安全读取配置项 (不执行 source)
config = {}
if os.path.exists('/opt/ip_sentinel/config.conf'):
with open('/opt/ip_sentinel/config.conf', 'r') as f:
for line in f:
line = line.strip()
if '=' in line and not line.startswith('#'):
key, val = line.split('=', 1)
config[key] = val.strip('"\'')
# 2. 安全截取日志最后15行
log_data = "日志文件不存在或为空"
log_path = '/opt/ip_sentinel/logs/sentinel.log'
if os.path.exists(log_path):
with open(log_path, 'r', errors='ignore') as f:
lines = f.readlines()
if lines:
log_data = "".join(lines[-15:])
# 3. 安全获取主机名
node_name = subprocess.check_output(['hostname']).decode('utf-8').strip()[:15]
# 4. 构建并发送请求
text_msg = f"📄 **[{node_name}] 实时运行日志:**\n```log\n{log_data}\n```"
data = urllib.parse.urlencode({
'chat_id': config.get('CHAT_ID', ''),
'text': text_msg,
'parse_mode': 'Markdown'
}).encode('utf-8')
req = urllib.request.Request(config.get('TG_API_URL', ''), data=data)
urllib.request.urlopen(req, timeout=10)
except Exception as e:
# 仅在本地静默打印异常,防止信息泄露
print(f"Log fetch error: {e}")
else:
self.send_response(404)