Merge pull request #109 from MisonL/feature/newapi-token-validation

fix: validate NEWAPI token input across UI and API
This commit is contained in:
演变
2026-03-27 19:11:23 +08:00
committed by GitHub
7 changed files with 317 additions and 8 deletions

View File

@@ -0,0 +1,67 @@
import asyncio
from contextlib import contextmanager
import pytest
from fastapi import HTTPException
import src.web.routes.upload.newapi_services as newapi_routes
from src.database.session import DatabaseSessionManager
from src.web.routes.upload.newapi_services import NewapiServiceCreate, NewapiServiceUpdate
def _build_fake_get_db(manager):
@contextmanager
def fake_get_db():
with manager.session_scope() as session:
yield session
return fake_get_db
def test_create_newapi_service_rejects_non_ascii_api_key(tmp_path, monkeypatch):
manager = DatabaseSessionManager(f"sqlite:///{tmp_path}/newapi-create.db")
manager.create_tables()
manager.migrate_tables()
monkeypatch.setattr(newapi_routes, "get_db", _build_fake_get_db(manager))
with pytest.raises(HTTPException) as exc_info:
asyncio.run(
newapi_routes.create_newapi_service(
NewapiServiceCreate(
name="bad-token",
api_url="https://newapi.example.com",
api_key="系统访问令牌 (System Access Token)",
)
)
)
assert exc_info.value.status_code == 400
assert exc_info.value.detail == "Authorization Token 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明"
def test_update_newapi_service_rejects_non_ascii_api_key(tmp_path, monkeypatch):
manager = DatabaseSessionManager(f"sqlite:///{tmp_path}/newapi-update.db")
manager.create_tables()
manager.migrate_tables()
monkeypatch.setattr(newapi_routes, "get_db", _build_fake_get_db(manager))
created = asyncio.run(
newapi_routes.create_newapi_service(
NewapiServiceCreate(
name="good-token",
api_url="https://newapi.example.com",
api_key="token-123",
)
)
)
with pytest.raises(HTTPException) as exc_info:
asyncio.run(
newapi_routes.update_newapi_service(
created.id,
NewapiServiceUpdate(api_key="系统访问令牌 (System Access Token)"),
)
)
assert exc_info.value.status_code == 400
assert exc_info.value.detail == "Authorization Token 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明"

View File

@@ -0,0 +1,58 @@
from types import SimpleNamespace
from src.core.upload import newapi_upload
class FakeResponse:
def __init__(self, status_code=200, payload=None, text=""):
self.status_code = status_code
self._payload = payload
self.text = text
def json(self):
if self._payload is None:
raise ValueError("no json payload")
return self._payload
def test_build_headers_rejects_non_ascii_api_key():
try:
newapi_upload._build_headers("系统访问令牌 (System Access Token)")
except ValueError as exc:
assert str(exc) == "Authorization Token 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明"
else:
raise AssertionError("expected ValueError")
def test_upload_to_newapi_uses_ascii_authorization_header(monkeypatch):
calls = []
def fake_post(url, **kwargs):
calls.append({"url": url, "kwargs": kwargs})
return FakeResponse(status_code=201)
monkeypatch.setattr(newapi_upload.cffi_requests, "post", fake_post)
success, message = newapi_upload.upload_to_newapi(
account=SimpleNamespace(email="tester@example.com", access_token="access-token"),
api_url="https://newapi.example.com/",
api_key="token-123",
)
assert success is True
assert message == "上传成功"
assert calls[0]["url"] == "https://newapi.example.com/api/channel/"
assert calls[0]["kwargs"]["headers"]["Authorization"] == "Bearer token-123"
assert calls[0]["kwargs"]["headers"]["Content-Type"] == "application/json; charset=utf-8"
assert calls[0]["kwargs"]["data"].startswith(b"{")
def test_upload_to_newapi_returns_clear_error_for_non_ascii_api_key():
success, message = newapi_upload.upload_to_newapi(
account=SimpleNamespace(email="tester@example.com", access_token="access-token"),
api_url="https://newapi.example.com/",
api_key="系统访问令牌 (System Access Token)",
)
assert success is False
assert message == "上传异常: Authorization Token 包含非 ASCII 字符,请确认填写的是实际令牌而不是中文说明"

View File

@@ -0,0 +1,109 @@
const test = require('node:test');
const assert = require('node:assert/strict');
const fs = require('node:fs');
const path = require('node:path');
const vm = require('node:vm');
const SETTINGS_JS_PATH = path.join(__dirname, '..', 'static', 'js', 'settings.js');
function createClassList() {
const values = new Set();
return {
add(...items) {
items.forEach((item) => values.add(item));
},
remove(...items) {
items.forEach((item) => values.delete(item));
},
contains(item) {
return values.has(item);
},
};
}
function createElementStub(overrides = {}) {
return {
value: '',
checked: false,
innerHTML: '',
textContent: '',
style: {},
dataset: {},
classList: createClassList(),
addEventListener() {},
removeEventListener() {},
querySelectorAll() {
return [];
},
querySelector() {
return null;
},
...overrides,
};
}
function createSandbox() {
const elements = new Map();
function getElement(id) {
if (!elements.has(id)) {
elements.set(id, createElementStub({ id }));
}
return elements.get(id);
}
const sandbox = {
console,
setTimeout,
clearTimeout,
document: {
getElementById(id) {
return getElement(id);
},
querySelectorAll() {
return [];
},
addEventListener() {},
},
window: null,
api: {
get: async () => [],
post: async () => ({ success: true }),
patch: async () => ({ success: true }),
delete: async () => ({ success: true }),
},
toast: {
success() {},
error() {},
},
confirm: async () => true,
escapeHtml(value) {
return String(value ?? '');
},
};
sandbox.window = sandbox;
vm.createContext(sandbox);
vm.runInContext(fs.readFileSync(SETTINGS_JS_PATH, 'utf8'), sandbox, { filename: 'settings.js' });
return sandbox;
}
test('validateNewapiApiKeyInput rejects non-ascii text', () => {
const sandbox = createSandbox();
const message = vm.runInContext(
"validateNewapiApiKeyInput('系统访问令牌 (System Access Token)', { required: true })",
sandbox,
);
assert.equal(message, 'Root Token / API Key 只能包含 ASCII 字符,请粘贴实际令牌,不要填写中文说明');
});
test('validateNewapiApiKeyInput allows ascii token', () => {
const sandbox = createSandbox();
const message = vm.runInContext(
"validateNewapiApiKeyInput('token-123', { required: true })",
sandbox,
);
assert.equal(message, '');
});