mirror of
https://github.com/cnlimiter/codex-register.git
synced 2026-05-06 20:02:51 +08:00
fix(newapi): validate token input across ui and api
This commit is contained in:
@@ -23,9 +23,29 @@ def _normalize_base(api_url: str) -> str:
|
||||
return (api_url or "").strip().rstrip("/")
|
||||
|
||||
|
||||
def normalize_authorization_token(header_value: str, header_name: str = "Authorization Token") -> str:
|
||||
normalized_value = (header_value or "").strip()
|
||||
if not normalized_value:
|
||||
raise ValueError(f"{header_name} 不能为空")
|
||||
try:
|
||||
normalized_value.encode("ascii")
|
||||
except UnicodeEncodeError as exc:
|
||||
raise ValueError(f"{header_name} 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明") from exc
|
||||
if any(ord(ch) < 32 or ord(ch) == 127 for ch in normalized_value):
|
||||
raise ValueError(f"{header_name} 包含非法控制字符")
|
||||
return normalized_value
|
||||
|
||||
|
||||
def _mask_header_value(header_value: str, keep: int = 4) -> str:
|
||||
if len(header_value) <= keep * 2:
|
||||
return "*" * len(header_value)
|
||||
return f"{header_value[:keep]}...{header_value[-keep:]}"
|
||||
|
||||
|
||||
def _build_headers(api_key: str) -> dict:
|
||||
safe_api_key = normalize_authorization_token(api_key)
|
||||
return {
|
||||
"Authorization": f"Bearer {api_key}",
|
||||
"Authorization": f"Bearer {safe_api_key}",
|
||||
"New-Api-User": "1",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
@@ -68,7 +88,7 @@ def upload_to_newapi(
|
||||
"auto_ban": 1,
|
||||
"name": account.email or "",
|
||||
"type": resolved_channel_type,
|
||||
"key": json.dumps({"access_token": account.access_token or "", "account_id": account_name}, ensure_ascii=False),
|
||||
"key": json.dumps({"access_token": account.access_token or "", "account_id": account_name}, ensure_ascii=True),
|
||||
"base_url": resolved_channel_base_url,
|
||||
"models": resolved_channel_models,
|
||||
"multi_key_mode": "random",
|
||||
@@ -79,10 +99,20 @@ def upload_to_newapi(
|
||||
}
|
||||
|
||||
try:
|
||||
payload = json.dumps({"mode": "single", "channel": channel}, ensure_ascii=True)
|
||||
headers = _build_headers(api_key)
|
||||
headers["Content-Type"] = "application/json; charset=utf-8"
|
||||
|
||||
logger.info("NEWAPI 上传 URL: %s", url)
|
||||
logger.info("NEWAPI 请求头: %s", {
|
||||
**headers,
|
||||
"Authorization": f"Bearer {_mask_header_value(headers['Authorization'][7:])}",
|
||||
})
|
||||
|
||||
resp = cffi_requests.post(
|
||||
url,
|
||||
headers=_build_headers(api_key),
|
||||
json={"mode": "single", "channel": channel},
|
||||
headers=headers,
|
||||
data=payload.encode("utf-8"),
|
||||
proxies=None,
|
||||
timeout=30,
|
||||
impersonate="chrome110",
|
||||
|
||||
@@ -8,6 +8,7 @@ from pydantic import BaseModel
|
||||
|
||||
from ....database import crud
|
||||
from ....database.session import get_db
|
||||
from ....core.upload.newapi_upload import normalize_authorization_token
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -67,6 +68,13 @@ def _to_response(svc) -> NewapiServiceResponse:
|
||||
)
|
||||
|
||||
|
||||
def _validated_newapi_api_key(api_key: str) -> str:
|
||||
try:
|
||||
return normalize_authorization_token(api_key)
|
||||
except ValueError as exc:
|
||||
raise HTTPException(status_code=400, detail=str(exc)) from exc
|
||||
|
||||
|
||||
@router.get("", response_model=List[NewapiServiceResponse])
|
||||
async def list_newapi_services(enabled: Optional[bool] = None):
|
||||
with get_db() as db:
|
||||
@@ -81,7 +89,7 @@ async def create_newapi_service(request: NewapiServiceCreate):
|
||||
db,
|
||||
name=request.name,
|
||||
api_url=request.api_url,
|
||||
api_key=request.api_key,
|
||||
api_key=_validated_newapi_api_key(request.api_key),
|
||||
channel_type=request.channel_type,
|
||||
channel_base_url=request.channel_base_url,
|
||||
channel_models=request.channel_models,
|
||||
@@ -113,7 +121,7 @@ async def update_newapi_service(service_id: int, request: NewapiServiceUpdate):
|
||||
if request.api_url is not None:
|
||||
update_data["api_url"] = request.api_url
|
||||
if request.api_key:
|
||||
update_data["api_key"] = request.api_key
|
||||
update_data["api_key"] = _validated_newapi_api_key(request.api_key)
|
||||
if request.enabled is not None:
|
||||
update_data["enabled"] = request.enabled
|
||||
if request.priority is not None:
|
||||
|
||||
@@ -1296,6 +1296,28 @@ function closeNewapiServiceModal() {
|
||||
elements.newapiServiceEditModal.classList.remove('active');
|
||||
}
|
||||
|
||||
function validateNewapiApiKeyInput(apiKey, { required = false } = {}) {
|
||||
const normalizedApiKey = String(apiKey || '').trim();
|
||||
if (!normalizedApiKey) {
|
||||
if (required) {
|
||||
return '新增服务时 Root Token / API Key 不能为空';
|
||||
}
|
||||
return '';
|
||||
}
|
||||
|
||||
for (const char of normalizedApiKey) {
|
||||
const code = char.charCodeAt(0);
|
||||
if (code > 127) {
|
||||
return 'Root Token / API Key 只能包含 ASCII 字符,请粘贴实际令牌,不要填写中文说明';
|
||||
}
|
||||
if (code < 32 || code === 127) {
|
||||
return 'Root Token / API Key 包含非法控制字符';
|
||||
}
|
||||
}
|
||||
|
||||
return '';
|
||||
}
|
||||
|
||||
async function editNewapiService(id) {
|
||||
try {
|
||||
const service = await api.get(`/newapi-services/${id}`);
|
||||
@@ -1321,8 +1343,9 @@ async function handleSaveNewapiService(e) {
|
||||
toast.error('名称和 API URL 不能为空');
|
||||
return;
|
||||
}
|
||||
if (!id && !apiKey) {
|
||||
toast.error('新增服务时 Root Token / API Key 不能为空');
|
||||
const apiKeyValidationError = validateNewapiApiKeyInput(apiKey, { required: !id });
|
||||
if (apiKeyValidationError) {
|
||||
toast.error(apiKeyValidationError);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -472,6 +472,7 @@
|
||||
<div class="form-group">
|
||||
<label for="newapi-service-key">Root Token / API Key *</label>
|
||||
<input type="password" id="newapi-service-key" placeholder="编辑时留空则保持原值" autocomplete="new-password">
|
||||
<p class="hint">仅支持 ASCII 字符,请直接粘贴系统访问令牌,不要填写中文说明。</p>
|
||||
</div>
|
||||
<div class="form-row">
|
||||
<div class="form-group">
|
||||
|
||||
67
tests/test_newapi_service_routes.py
Normal file
67
tests/test_newapi_service_routes.py
Normal file
@@ -0,0 +1,67 @@
|
||||
import asyncio
|
||||
from contextlib import contextmanager
|
||||
|
||||
import pytest
|
||||
from fastapi import HTTPException
|
||||
|
||||
import src.web.routes.upload.newapi_services as newapi_routes
|
||||
from src.database.session import DatabaseSessionManager
|
||||
from src.web.routes.upload.newapi_services import NewapiServiceCreate, NewapiServiceUpdate
|
||||
|
||||
|
||||
def _build_fake_get_db(manager):
|
||||
@contextmanager
|
||||
def fake_get_db():
|
||||
with manager.session_scope() as session:
|
||||
yield session
|
||||
|
||||
return fake_get_db
|
||||
|
||||
|
||||
def test_create_newapi_service_rejects_non_ascii_api_key(tmp_path, monkeypatch):
|
||||
manager = DatabaseSessionManager(f"sqlite:///{tmp_path}/newapi-create.db")
|
||||
manager.create_tables()
|
||||
manager.migrate_tables()
|
||||
monkeypatch.setattr(newapi_routes, "get_db", _build_fake_get_db(manager))
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
asyncio.run(
|
||||
newapi_routes.create_newapi_service(
|
||||
NewapiServiceCreate(
|
||||
name="bad-token",
|
||||
api_url="https://newapi.example.com",
|
||||
api_key="系统访问令牌 (System Access Token)",
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
assert exc_info.value.status_code == 400
|
||||
assert exc_info.value.detail == "Authorization Token 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明"
|
||||
|
||||
|
||||
def test_update_newapi_service_rejects_non_ascii_api_key(tmp_path, monkeypatch):
|
||||
manager = DatabaseSessionManager(f"sqlite:///{tmp_path}/newapi-update.db")
|
||||
manager.create_tables()
|
||||
manager.migrate_tables()
|
||||
monkeypatch.setattr(newapi_routes, "get_db", _build_fake_get_db(manager))
|
||||
|
||||
created = asyncio.run(
|
||||
newapi_routes.create_newapi_service(
|
||||
NewapiServiceCreate(
|
||||
name="good-token",
|
||||
api_url="https://newapi.example.com",
|
||||
api_key="token-123",
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
asyncio.run(
|
||||
newapi_routes.update_newapi_service(
|
||||
created.id,
|
||||
NewapiServiceUpdate(api_key="系统访问令牌 (System Access Token)"),
|
||||
)
|
||||
)
|
||||
|
||||
assert exc_info.value.status_code == 400
|
||||
assert exc_info.value.detail == "Authorization Token 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明"
|
||||
58
tests/test_newapi_upload.py
Normal file
58
tests/test_newapi_upload.py
Normal file
@@ -0,0 +1,58 @@
|
||||
from types import SimpleNamespace
|
||||
|
||||
from src.core.upload import newapi_upload
|
||||
|
||||
|
||||
class FakeResponse:
|
||||
def __init__(self, status_code=200, payload=None, text=""):
|
||||
self.status_code = status_code
|
||||
self._payload = payload
|
||||
self.text = text
|
||||
|
||||
def json(self):
|
||||
if self._payload is None:
|
||||
raise ValueError("no json payload")
|
||||
return self._payload
|
||||
|
||||
|
||||
def test_build_headers_rejects_non_ascii_api_key():
|
||||
try:
|
||||
newapi_upload._build_headers("系统访问令牌 (System Access Token)")
|
||||
except ValueError as exc:
|
||||
assert str(exc) == "Authorization Token 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明"
|
||||
else:
|
||||
raise AssertionError("expected ValueError")
|
||||
|
||||
|
||||
def test_upload_to_newapi_uses_ascii_authorization_header(monkeypatch):
|
||||
calls = []
|
||||
|
||||
def fake_post(url, **kwargs):
|
||||
calls.append({"url": url, "kwargs": kwargs})
|
||||
return FakeResponse(status_code=201)
|
||||
|
||||
monkeypatch.setattr(newapi_upload.cffi_requests, "post", fake_post)
|
||||
|
||||
success, message = newapi_upload.upload_to_newapi(
|
||||
account=SimpleNamespace(email="tester@example.com", access_token="access-token"),
|
||||
api_url="https://newapi.example.com/",
|
||||
api_key="token-123",
|
||||
)
|
||||
|
||||
assert success is True
|
||||
assert message == "上传成功"
|
||||
assert calls[0]["url"] == "https://newapi.example.com/api/channel/"
|
||||
assert calls[0]["kwargs"]["headers"]["Authorization"] == "Bearer token-123"
|
||||
assert calls[0]["kwargs"]["headers"]["Content-Type"] == "application/json; charset=utf-8"
|
||||
assert calls[0]["kwargs"]["data"].startswith(b"{")
|
||||
|
||||
|
||||
def test_upload_to_newapi_returns_clear_error_for_non_ascii_api_key():
|
||||
success, message = newapi_upload.upload_to_newapi(
|
||||
account=SimpleNamespace(email="tester@example.com", access_token="access-token"),
|
||||
api_url="https://newapi.example.com/",
|
||||
api_key="系统访问令牌 (System Access Token)",
|
||||
)
|
||||
|
||||
assert success is False
|
||||
assert message == "上传异常: Authorization Token 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明"
|
||||
109
tests/test_settings_newapi_validation.cjs
Normal file
109
tests/test_settings_newapi_validation.cjs
Normal file
@@ -0,0 +1,109 @@
|
||||
const test = require('node:test');
|
||||
const assert = require('node:assert/strict');
|
||||
const fs = require('node:fs');
|
||||
const path = require('node:path');
|
||||
const vm = require('node:vm');
|
||||
|
||||
const SETTINGS_JS_PATH = path.join(__dirname, '..', 'static', 'js', 'settings.js');
|
||||
|
||||
function createClassList() {
|
||||
const values = new Set();
|
||||
return {
|
||||
add(...items) {
|
||||
items.forEach((item) => values.add(item));
|
||||
},
|
||||
remove(...items) {
|
||||
items.forEach((item) => values.delete(item));
|
||||
},
|
||||
contains(item) {
|
||||
return values.has(item);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function createElementStub(overrides = {}) {
|
||||
return {
|
||||
value: '',
|
||||
checked: false,
|
||||
innerHTML: '',
|
||||
textContent: '',
|
||||
style: {},
|
||||
dataset: {},
|
||||
classList: createClassList(),
|
||||
addEventListener() {},
|
||||
removeEventListener() {},
|
||||
querySelectorAll() {
|
||||
return [];
|
||||
},
|
||||
querySelector() {
|
||||
return null;
|
||||
},
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function createSandbox() {
|
||||
const elements = new Map();
|
||||
|
||||
function getElement(id) {
|
||||
if (!elements.has(id)) {
|
||||
elements.set(id, createElementStub({ id }));
|
||||
}
|
||||
return elements.get(id);
|
||||
}
|
||||
|
||||
const sandbox = {
|
||||
console,
|
||||
setTimeout,
|
||||
clearTimeout,
|
||||
document: {
|
||||
getElementById(id) {
|
||||
return getElement(id);
|
||||
},
|
||||
querySelectorAll() {
|
||||
return [];
|
||||
},
|
||||
addEventListener() {},
|
||||
},
|
||||
window: null,
|
||||
api: {
|
||||
get: async () => [],
|
||||
post: async () => ({ success: true }),
|
||||
patch: async () => ({ success: true }),
|
||||
delete: async () => ({ success: true }),
|
||||
},
|
||||
toast: {
|
||||
success() {},
|
||||
error() {},
|
||||
},
|
||||
confirm: async () => true,
|
||||
escapeHtml(value) {
|
||||
return String(value ?? '');
|
||||
},
|
||||
};
|
||||
|
||||
sandbox.window = sandbox;
|
||||
vm.createContext(sandbox);
|
||||
vm.runInContext(fs.readFileSync(SETTINGS_JS_PATH, 'utf8'), sandbox, { filename: 'settings.js' });
|
||||
return sandbox;
|
||||
}
|
||||
|
||||
test('validateNewapiApiKeyInput rejects non-ascii text', () => {
|
||||
const sandbox = createSandbox();
|
||||
const message = vm.runInContext(
|
||||
"validateNewapiApiKeyInput('系统访问令牌 (System Access Token)', { required: true })",
|
||||
sandbox,
|
||||
);
|
||||
|
||||
assert.equal(message, 'Root Token / API Key 只能包含 ASCII 字符,请粘贴实际令牌,不要填写中文说明');
|
||||
});
|
||||
|
||||
test('validateNewapiApiKeyInput allows ascii token', () => {
|
||||
const sandbox = createSandbox();
|
||||
const message = vm.runInContext(
|
||||
"validateNewapiApiKeyInput('token-123', { required: true })",
|
||||
sandbox,
|
||||
);
|
||||
|
||||
assert.equal(message, '');
|
||||
});
|
||||
Reference in New Issue
Block a user