feat(registration): 添加批量任务日志同步功能

在批量注册任务中,将单个任务的详细日志同步推送到批量任务频道,使前端能在混合日志中查看每个任务的详细执行步骤。
This commit is contained in:
cnlimiter
2026-03-16 10:35:06 +08:00
parent 4e5c53f627
commit bb75fe08dd
2 changed files with 37 additions and 29 deletions

View File

@@ -176,7 +176,7 @@ def task_to_response(task: RegistrationTask) -> RegistrationTaskResponse:
)
def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = ""):
def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = "", batch_id: str = ""):
"""
在线程池中执行的同步注册任务
@@ -314,7 +314,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
email_service = EmailServiceFactory.create(service_type, config)
# 创建注册引擎 - 使用 TaskManager 的日志回调
log_callback = task_manager.create_log_callback(task_uuid, prefix=log_prefix)
log_callback = task_manager.create_log_callback(task_uuid, prefix=log_prefix, batch_id=batch_id)
engine = RegistrationEngine(
email_service=email_service,
@@ -377,7 +377,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
pass
async def run_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = ""):
async def run_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = "", batch_id: str = ""):
"""
异步执行注册任务
@@ -393,7 +393,7 @@ async def run_registration_task(task_uuid: str, email_service_type: str, proxy:
task_manager.add_log(task_uuid, f"{log_prefix} [系统] 任务 {task_uuid[:8]} 已加入队列" if log_prefix else f"[系统] 任务 {task_uuid[:8]} 已加入队列")
try:
# 在线程池中执行同步任务(传入 log_prefix 供回调使用)
# 在线程池中执行同步任务(传入 log_prefix 和 batch_id 供回调使用)
await loop.run_in_executor(
task_manager.executor,
_run_sync_registration_task,
@@ -402,7 +402,8 @@ async def run_registration_task(task_uuid: str, email_service_type: str, proxy:
proxy,
email_service_config,
email_service_id,
log_prefix
log_prefix,
batch_id
)
except Exception as e:
logger.error(f"线程池执行异常: {task_uuid}, 错误: {e}")
@@ -456,6 +457,7 @@ async def run_batch_parallel(
_init_batch_state(batch_id, task_uuids)
add_batch_log, update_batch_status = _make_batch_helpers(batch_id)
semaphore = asyncio.Semaphore(concurrency)
counter_lock = asyncio.Lock()
add_batch_log(f"[系统] 并行模式启动,并发数: {concurrency},总任务: {len(task_uuids)}")
async def _run_one(idx: int, uuid: str):
@@ -463,21 +465,22 @@ async def run_batch_parallel(
async with semaphore:
await run_registration_task(
uuid, email_service_type, proxy, email_service_config, email_service_id,
log_prefix=prefix
log_prefix=prefix, batch_id=batch_id
)
with get_db() as db:
t = crud.get_registration_task(db, uuid)
if t:
new_completed = batch_tasks[batch_id]["completed"] + 1
new_success = batch_tasks[batch_id]["success"]
new_failed = batch_tasks[batch_id]["failed"]
if t.status == "completed":
new_success += 1
add_batch_log(f"{prefix} [成功] 注册成功")
elif t.status == "failed":
new_failed += 1
add_batch_log(f"{prefix} [失败] 注册失败: {t.error_message}")
update_batch_status(completed=new_completed, success=new_success, failed=new_failed)
async with counter_lock:
new_completed = batch_tasks[batch_id]["completed"] + 1
new_success = batch_tasks[batch_id]["success"]
new_failed = batch_tasks[batch_id]["failed"]
if t.status == "completed":
new_success += 1
add_batch_log(f"{prefix} [成功] 注册成功")
elif t.status == "failed":
new_failed += 1
add_batch_log(f"{prefix} [失败] 注册失败: {t.error_message}")
update_batch_status(completed=new_completed, success=new_success, failed=new_failed)
try:
await asyncio.gather(*[_run_one(i, u) for i, u in enumerate(task_uuids)], return_exceptions=True)
@@ -511,6 +514,7 @@ async def run_batch_pipeline(
_init_batch_state(batch_id, task_uuids)
add_batch_log, update_batch_status = _make_batch_helpers(batch_id)
semaphore = asyncio.Semaphore(concurrency)
counter_lock = asyncio.Lock()
running_tasks_list = []
add_batch_log(f"[系统] 流水线模式启动,并发数: {concurrency},总任务: {len(task_uuids)}")
@@ -518,21 +522,22 @@ async def run_batch_pipeline(
try:
await run_registration_task(
uuid, email_service_type, proxy, email_service_config, email_service_id,
log_prefix=pfx
log_prefix=pfx, batch_id=batch_id
)
with get_db() as db:
t = crud.get_registration_task(db, uuid)
if t:
new_completed = batch_tasks[batch_id]["completed"] + 1
new_success = batch_tasks[batch_id]["success"]
new_failed = batch_tasks[batch_id]["failed"]
if t.status == "completed":
new_success += 1
add_batch_log(f"{pfx} [成功] 注册成功")
elif t.status == "failed":
new_failed += 1
add_batch_log(f"{pfx} [失败] 注册失败: {t.error_message}")
update_batch_status(completed=new_completed, success=new_success, failed=new_failed)
async with counter_lock:
new_completed = batch_tasks[batch_id]["completed"] + 1
new_success = batch_tasks[batch_id]["success"]
new_failed = batch_tasks[batch_id]["failed"]
if t.status == "completed":
new_success += 1
add_batch_log(f"{pfx} [成功] 注册成功")
elif t.status == "failed":
new_failed += 1
add_batch_log(f"{pfx} [失败] 注册失败: {t.error_message}")
update_batch_status(completed=new_completed, success=new_success, failed=new_failed)
finally:
semaphore.release()

View File

@@ -344,11 +344,14 @@ class TaskManager:
_ws_sent_index[key].pop(id(websocket), None)
logger.info(f"批量任务 WebSocket 连接已注销: {batch_id}")
def create_log_callback(self, task_uuid: str, prefix: str = "") -> Callable[[str], None]:
"""创建日志回调函数,可附加任务编号前缀"""
def create_log_callback(self, task_uuid: str, prefix: str = "", batch_id: str = "") -> Callable[[str], None]:
"""创建日志回调函数,可附加任务编号前缀,并同时推送到批量任务频道"""
def callback(msg: str):
full_msg = f"{prefix} {msg}" if prefix else msg
self.add_log(task_uuid, full_msg)
# 如果属于批量任务,同步推送到 batch 频道,前端可在混合日志中看到详细步骤
if batch_id:
self.add_batch_log(batch_id, full_msg)
return callback
def create_check_cancelled_callback(self, task_uuid: str) -> Callable[[], bool]: