feat: add Codex auth login and export flow

Add Codex Auth support in account management so selected accounts can
complete a Codex-compatible OAuth login flow and export usable auth.json
files.

This commit includes:
- account-management UI entrypoints for Codex Auth login and auth.json download
- backend SSE routes for single-account and batch Codex Auth login execution
- persistence of freshly returned Codex-compatible tokens back into the account database
- Codex auth export support for direct auth.json download and batch zip packaging
- tests covering the Codex Auth login flow and export behavior

The OTP verification failure was caused by manually sending a second OTP after
password verification. The flow now reuses the existing proven login path:
login re-entry, password verification, automatic OTP reception, consent page
handling, workspace selection, and OAuth callback exchange.

Successful logins now also persist workspace_id together with the refreshed
Codex-compatible tokens, making later re-export of auth.json possible without
requiring the browser-downloaded file to still exist locally.

Change-Id: I59df518ef4dc05f8bc52c734dd1b738fcb0b7a4e
This commit is contained in:
Solo
2026-03-25 16:31:03 +08:00
parent 1cbb95f91c
commit f4d0327f67
11 changed files with 1266 additions and 13 deletions

View File

@@ -1,12 +1,14 @@
"""
账号管理 API 路由
"""
import asyncio
import io
import json
import logging
import threading
import zipfile
from datetime import datetime
from typing import List, Optional
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, HTTPException, Query, BackgroundTasks, Body
from fastapi.responses import StreamingResponse
@@ -29,6 +31,98 @@ logger = logging.getLogger(__name__)
router = APIRouter()
def _get_account_extra_data(account: Account) -> Dict[str, Any]:
extra_data = account.extra_data
if isinstance(extra_data, dict):
return dict(extra_data)
return {}
def _build_codex_auth_extra_data(
existing_extra_data: Optional[Dict[str, Any]],
*,
workspace_id: str = "",
generated_at: Optional[datetime] = None,
) -> Dict[str, Any]:
extra_data = dict(existing_extra_data or {})
codex_auth = dict(extra_data.get("codex_auth") or {})
codex_auth["generated"] = True
codex_auth["generated_at"] = (generated_at or datetime.utcnow()).isoformat()
if workspace_id:
codex_auth["workspace_id"] = workspace_id
extra_data["codex_auth"] = codex_auth
return extra_data
def _has_generated_codex_auth(account: Account) -> bool:
codex_auth = _get_account_extra_data(account).get("codex_auth")
return isinstance(codex_auth, dict) and bool(codex_auth.get("generated"))
def _ensure_codex_auth_export_ready(accounts: List[Account]) -> None:
missing = [acc.email for acc in accounts if not _has_generated_codex_auth(acc)]
if not missing:
return
missing_summary = "".join(missing[:10])
if len(missing) > 10:
missing_summary += f"{len(missing)} 个账号"
raise HTTPException(
status_code=400,
detail=(
"以下账号尚未生成 Codex Auth请先在账号管理中点击「Codex Auth 登录」后再导出:"
f"{missing_summary}"
),
)
def _persist_codex_auth_result(
db,
*,
account_id: int,
auth_json: Dict[str, Any],
workspace_id: str = "",
) -> None:
account = crud.get_account_by_id(db, account_id)
if not account:
raise ValueError(f"账号不存在: {account_id}")
tokens = auth_json.get("tokens") or {}
openai_account_id = str(tokens.get("account_id") or "").strip()
workspace_id = str(workspace_id or "").strip()
update_kwargs = {
"access_token": tokens.get("access_token", ""),
"refresh_token": tokens.get("refresh_token", ""),
"id_token": tokens.get("id_token", ""),
"last_refresh": datetime.utcnow(),
"extra_data": _build_codex_auth_extra_data(
_get_account_extra_data(account),
workspace_id=workspace_id,
),
}
if openai_account_id:
update_kwargs["account_id"] = openai_account_id
if workspace_id:
update_kwargs["workspace_id"] = workspace_id
for key, value in update_kwargs.items():
setattr(account, key, value)
token_values = {
"access_token": account.access_token,
"refresh_token": account.refresh_token,
"id_token": account.id_token,
"session_token": account.session_token,
}
account.token_sync_status = "pending" if any(token_values.values()) else "not_ready"
account.token_sync_updated_at = datetime.utcnow()
db.commit()
db.refresh(account)
def _get_proxy(request_proxy: Optional[str] = None) -> Optional[str]:
"""获取代理 URL策略与注册流程一致代理列表 → 动态代理 → 静态配置"""
if request_proxy:
@@ -537,6 +631,351 @@ async def export_accounts_cpa(request: BatchExportRequest):
)
@router.post("/export/codex_auth")
async def export_accounts_codex_auth(request: BatchExportRequest):
"""导出账号为 Codex CLI auth.json 格式"""
with get_db() as db:
ids = resolve_account_ids(
db, request.ids, request.select_all,
request.status_filter, request.email_service_filter, request.search_filter
)
accounts = db.query(Account).filter(Account.id.in_(ids)).all()
if not accounts:
raise HTTPException(status_code=400, detail="没有可导出的账号")
_ensure_codex_auth_export_ready(accounts)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
def build_auth_json(acc):
return {
"auth_mode": "chatgpt",
"OPENAI_API_KEY": None,
"tokens": {
"id_token": acc.id_token or "",
"access_token": acc.access_token or "",
"refresh_token": acc.refresh_token or "",
"account_id": acc.account_id or ""
},
"last_refresh": acc.last_refresh.isoformat() if acc.last_refresh else ""
}
if len(accounts) == 1:
acc = accounts[0]
auth_data = build_auth_json(acc)
content = json.dumps(auth_data, ensure_ascii=False, indent=2)
filename = "auth.json"
return StreamingResponse(
iter([content]),
media_type="application/json",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
# 多个账号打包为 ZIP
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf:
for acc in accounts:
auth_data = build_auth_json(acc)
content = json.dumps(auth_data, ensure_ascii=False, indent=2)
zf.writestr(f"{acc.email}/auth.json", content)
zip_buffer.seek(0)
zip_filename = f"codex_auth_{timestamp}.zip"
return StreamingResponse(
zip_buffer,
media_type="application/zip",
headers={"Content-Disposition": f"attachment; filename={zip_filename}"}
)
# ============== Codex Auth 登录导出 ==============
def _build_email_service_for_account(db, account: Account):
"""根据账号的邮箱服务类型,复用收件箱逻辑构建邮箱服务实例(用于读取 OTP"""
from ...services import EmailServiceFactory, EmailServiceType
email_service_type = account.email_service
if not email_service_type:
raise ValueError(f"账号 {account.email} 没有关联的邮箱服务类型")
try:
service_type = EmailServiceType(email_service_type)
except ValueError:
raise ValueError(f"不支持的邮箱服务类型: {email_service_type}")
config = _build_inbox_config(db, service_type, account.email)
if config is None:
raise ValueError(f"未找到可用的 {email_service_type} 邮箱服务配置")
# 添加代理
proxy_url = _get_proxy()
if proxy_url and 'proxy_url' not in config:
config['proxy_url'] = proxy_url
return EmailServiceFactory.create(service_type, config)
class CodexAuthLoginRequest(BaseModel):
"""Codex Auth 登录请求"""
account_id: int
@router.post("/codex-auth-login")
async def codex_auth_login(request: CodexAuthLoginRequest):
"""
对指定账号执行 Codex CLI 登录流程,获取 Codex 兼容的 auth.json。
使用 SSE 推送实时日志,最终返回 auth.json 数据。
"""
import queue
with get_db() as db:
account = db.query(Account).filter(Account.id == request.account_id).first()
if not account:
raise HTTPException(status_code=404, detail="账号不存在")
if not account.password:
raise HTTPException(status_code=400, detail=f"账号 {account.email} 没有密码,无法登录")
# 提取需要的数据(避免跨线程 session 问题)
email = account.email
password = account.password
account_db_id = account.id
email_svc_id = account.email_service_id
try:
email_service = _build_email_service_for_account(db, account)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
proxy_url = _get_proxy()
log_queue = queue.Queue()
def log_callback(msg: str):
log_queue.put(("log", msg))
def run_login():
from ...core.codex_auth import CodexAuthEngine
try:
engine = CodexAuthEngine(
email=email,
password=password,
email_service=email_service,
proxy_url=proxy_url,
callback_logger=log_callback,
email_service_id=email_svc_id,
)
result = engine.run()
log_queue.put(("result", {
"success": result.success,
"email": result.email,
"workspace_id": result.workspace_id,
"auth_json": result.auth_json,
"error_message": result.error_message,
}))
except Exception as e:
log_queue.put(("result", {
"success": False,
"email": email,
"workspace_id": "",
"auth_json": None,
"error_message": str(e),
}))
async def event_generator():
thread = threading.Thread(target=run_login, daemon=True)
thread.start()
while True:
try:
# 非阻塞轮询队列
try:
msg_type, msg_data = log_queue.get_nowait()
except queue.Empty:
await asyncio.sleep(0.3)
if not thread.is_alive() and log_queue.empty():
break
continue
if msg_type == "log":
yield f"data: {json.dumps({'type': 'log', 'message': msg_data}, ensure_ascii=False)}\n\n"
elif msg_type == "result":
# 如果登录成功,同时更新数据库中的 token
if msg_data["success"] and msg_data["auth_json"]:
try:
with get_db() as db:
_persist_codex_auth_result(
db,
account_id=account_db_id,
auth_json=msg_data["auth_json"],
workspace_id=str(msg_data.get("workspace_id") or "").strip(),
)
except Exception as e:
logger.warning(f"更新数据库 token 失败: {e}")
yield f"data: {json.dumps({'type': 'result', **msg_data}, ensure_ascii=False)}\n\n"
break
except Exception:
break
thread.join(timeout=5)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
class CodexAuthBatchRequest(BaseModel):
"""批量 Codex Auth 登录请求"""
ids: List[int] = []
select_all: bool = False
status_filter: Optional[str] = None
email_service_filter: Optional[str] = None
search_filter: Optional[str] = None
@router.post("/codex-auth-login/batch")
async def codex_auth_login_batch(request: CodexAuthBatchRequest):
"""
批量 Codex Auth 登录。
逐个执行登录,通过 SSE 推送每个账号的进度和结果。
全部完成后打包下载。
"""
import queue
with get_db() as db:
ids = resolve_account_ids(
db, request.ids, request.select_all,
request.status_filter, request.email_service_filter, request.search_filter
)
accounts_data = []
for acc in db.query(Account).filter(Account.id.in_(ids)).all():
if not acc.password:
continue
accounts_data.append({
"id": acc.id,
"email": acc.email,
"password": acc.password,
"email_service": acc.email_service,
"email_service_id": acc.email_service_id,
})
if not accounts_data:
raise HTTPException(status_code=400, detail="没有符合条件的账号(需要有密码)")
log_queue = queue.Queue()
def run_batch():
from ...core.codex_auth import CodexAuthEngine
results = []
for i, acc_data in enumerate(accounts_data):
log_queue.put(("progress", {
"current": i + 1,
"total": len(accounts_data),
"email": acc_data["email"],
}))
try:
with get_db() as db:
account = db.query(Account).filter(Account.id == acc_data["id"]).first()
if not account:
continue
email_service = _build_email_service_for_account(db, account)
proxy_url = _get_proxy()
def log_cb(msg, email=acc_data["email"]):
log_queue.put(("log", f"[{email}] {msg}"))
engine = CodexAuthEngine(
email=acc_data["email"],
password=acc_data["password"],
email_service=email_service,
proxy_url=proxy_url,
callback_logger=log_cb,
email_service_id=acc_data.get("email_service_id"),
)
result = engine.run()
if result.success and result.auth_json:
# 更新数据库
try:
with get_db() as db:
_persist_codex_auth_result(
db,
account_id=acc_data["id"],
auth_json=result.auth_json,
workspace_id=str(result.workspace_id or "").strip(),
)
except Exception as e:
logger.warning(f"更新数据库 token 失败: {e}")
results.append({
"email": acc_data["email"],
"workspace_id": result.workspace_id,
"auth_json": result.auth_json,
})
log_queue.put(("account_result", {
"email": acc_data["email"],
"success": True,
}))
else:
log_queue.put(("account_result", {
"email": acc_data["email"],
"success": False,
"error": result.error_message,
}))
except Exception as e:
log_queue.put(("account_result", {
"email": acc_data["email"],
"success": False,
"error": str(e),
}))
log_queue.put(("batch_done", results))
async def event_generator():
thread = threading.Thread(target=run_batch, daemon=True)
thread.start()
while True:
try:
try:
msg_type, msg_data = log_queue.get_nowait()
except queue.Empty:
await asyncio.sleep(0.3)
if not thread.is_alive() and log_queue.empty():
break
continue
if msg_type == "batch_done":
yield f"data: {json.dumps({'type': 'batch_done', 'results': msg_data}, ensure_ascii=False)}\n\n"
break
else:
yield f"data: {json.dumps({'type': msg_type, **msg_data} if isinstance(msg_data, dict) else {'type': msg_type, 'message': msg_data}, ensure_ascii=False)}\n\n"
except Exception:
break
thread.join(timeout=5)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@router.get("/stats/summary")
async def get_accounts_stats():
"""获取账号统计信息"""