fix: broadcast single task completion status to web ui

This commit is contained in:
zhoukailian
2026-03-23 20:46:40 +08:00
committed by Mison
parent 9fdc7a882d
commit 1db7642a43
4 changed files with 216 additions and 7 deletions

View File

@@ -246,9 +246,6 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
logger.error(f"任务不存在: {task_uuid}")
return
# 更新 TaskManager 状态
task_manager.update_status(task_uuid, "running")
# 确定使用的代理
# 如果前端传入了代理参数,使用传入的
# 否则从代理列表或系统设置中获取
@@ -393,6 +390,9 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
email_service = EmailServiceFactory.create(service_type, config)
# 在 WebSocket 状态里附带邮箱服务类型,前端可同步更新任务卡片
task_manager.update_status(task_uuid, "running", email_service=service_type.value)
# 创建注册引擎 - 使用 TaskManager 的日志回调
log_callback = task_manager.create_log_callback(task_uuid, prefix=log_prefix, batch_id=batch_id)
@@ -504,11 +504,19 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
db, task_uuid,
status="completed",
completed_at=datetime.utcnow(),
result=result.to_dict()
result={
**result.to_dict(),
"email_service": service_type.value,
}
)
# 更新 TaskManager 状态
task_manager.update_status(task_uuid, "completed", email=result.email)
task_manager.update_status(
task_uuid,
"completed",
email=result.email,
email_service=service_type.value,
)
logger.info(f"注册任务完成: {task_uuid}, 邮箱: {result.email}")
else:
@@ -521,7 +529,12 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
)
# 更新 TaskManager 状态
task_manager.update_status(task_uuid, "failed", error=result.error_message)
task_manager.update_status(
task_uuid,
"failed",
error=result.error_message,
email_service=service_type.value,
)
logger.warning(f"注册任务失败: {task_uuid}, 原因: {result.error_message}")
@@ -555,7 +568,7 @@ async def run_registration_task(task_uuid: str, email_service_type: str, proxy:
task_manager.set_loop(loop)
# 初始化 TaskManager 状态
task_manager.update_status(task_uuid, "pending")
task_manager.update_status(task_uuid, "pending", email_service=email_service_type)
task_manager.add_log(task_uuid, f"{log_prefix} [系统] 任务 {task_uuid[:8]} 已加入队列" if log_prefix else f"[系统] 任务 {task_uuid[:8]} 已加入队列")
try:

View File

@@ -552,6 +552,12 @@ function connectWebSocket(taskUuid) {
const logType = getLogType(data.message);
addLog(logType, data.message);
} else if (data.type === 'status') {
if (data.email) {
elements.taskEmail.textContent = data.email;
}
if (data.email_service) {
elements.taskService.textContent = getServiceTypeText(data.email_service);
}
updateTaskStatus(data.status);
// 检查是否完成

View File

@@ -0,0 +1,150 @@
const test = require('node:test');
const assert = require('node:assert/strict');
const fs = require('node:fs');
const vm = require('node:vm');
const APP_JS_PATH = '/Users/zhoukailian/.config/superpowers/worktrees/codex-manager/repro-batch-monitor/static/js/app.js';
function createElementStub() {
return {
style: {},
dataset: {},
value: '',
checked: false,
disabled: false,
innerHTML: '',
textContent: '',
className: '',
appendChild() {},
addEventListener() {},
removeEventListener() {},
querySelector() {
return createElementStub();
},
querySelectorAll() {
return [];
},
closest() {
return null;
},
};
}
function createSandbox() {
const elements = new Map();
const sandbox = {
console,
setTimeout,
clearTimeout,
setInterval: () => 1,
clearInterval: () => {},
Event: class Event {
constructor(type) {
this.type = type;
}
},
document: {
getElementById(id) {
if (!elements.has(id)) {
elements.set(id, createElementStub());
}
return elements.get(id);
},
createElement() {
return createElementStub();
},
addEventListener() {},
querySelector() {
return createElementStub();
},
querySelectorAll() {
return [];
},
},
sessionStorage: {
getItem() {
return null;
},
setItem() {},
removeItem() {},
},
toast: {
info() {},
success() {},
warning() {},
error() {},
},
api: {
get() {
throw new Error('api.get should not be called in this test');
},
post() {
throw new Error('api.post should not be called in this test');
},
},
loadRecentAccounts() {},
getServiceTypeText(type) {
return {
tempmail: '临时邮箱',
outlook: 'Outlook',
}[type] || type;
},
window: null,
WebSocket: null,
};
sandbox.window = sandbox;
sandbox.window.location = { protocol: 'http:', host: '127.0.0.1:8005' };
vm.createContext(sandbox);
vm.runInContext(fs.readFileSync(APP_JS_PATH, 'utf8'), sandbox, { filename: 'app.js' });
return { sandbox, elements };
}
test('single task websocket completion updates task info and resets buttons', () => {
const { sandbox, elements } = createSandbox();
vm.runInContext(
`
var __lastWs = null;
startLogPolling = function() {
throw new Error('startLogPolling should not be called for completed status');
};
loadRecentAccounts = function() {};
currentTask = { task_uuid: 'task-1' };
taskCompleted = false;
taskFinalStatus = null;
elements.startBtn.disabled = true;
elements.cancelBtn.disabled = false;
elements.taskStatusRow.style.display = 'grid';
WebSocket = function(url) {
this.url = url;
this.readyState = 0;
__lastWs = this;
};
WebSocket.OPEN = 1;
WebSocket.CLOSED = 3;
WebSocket.prototype.close = function() {
this.readyState = WebSocket.CLOSED;
};
connectWebSocket('task-1');
__lastWs.onmessage({
data: JSON.stringify({
type: 'status',
status: 'completed',
email: 'demo@example.com',
email_service: 'tempmail',
}),
});
`,
sandbox,
);
assert.equal(elements.get('start-btn').disabled, false);
assert.equal(elements.get('cancel-btn').disabled, true);
assert.equal(elements.get('task-status').textContent, '已完成');
assert.equal(elements.get('task-email').textContent, 'demo@example.com');
assert.equal(elements.get('task-service').textContent, '临时邮箱');
});

View File

@@ -0,0 +1,40 @@
import asyncio
from src.web.task_manager import task_manager
class FakeWebSocket:
def __init__(self):
self.messages = []
async def send_json(self, payload):
self.messages.append(payload)
def test_update_status_broadcasts_to_registered_websocket():
async def run_test():
task_uuid = "test-status-broadcast"
websocket = FakeWebSocket()
task_manager.set_loop(asyncio.get_running_loop())
task_manager.register_websocket(task_uuid, websocket)
try:
task_manager.update_status(
task_uuid,
"completed",
email="demo@example.com",
email_service="tempmail",
)
await asyncio.sleep(0.05)
assert websocket.messages, "expected a status message to be broadcast"
assert websocket.messages[-1]["type"] == "status"
assert websocket.messages[-1]["status"] == "completed"
assert websocket.messages[-1]["email"] == "demo@example.com"
assert websocket.messages[-1]["email_service"] == "tempmail"
finally:
task_manager.unregister_websocket(task_uuid, websocket)
asyncio.run(run_test())