From bb75fe08dd915c4a13a01f710d48c574b70badb8 Mon Sep 17 00:00:00 2001 From: cnlimiter Date: Mon, 16 Mar 2026 10:35:06 +0800 Subject: [PATCH] =?UTF-8?q?feat(registration):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E4=BB=BB=E5=8A=A1=E6=97=A5=E5=BF=97=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在批量注册任务中,将单个任务的详细日志同步推送到批量任务频道,使前端能在混合日志中查看每个任务的详细执行步骤。 --- src/web/routes/registration.py | 59 ++++++++++++++++++---------------- src/web/task_manager.py | 7 ++-- 2 files changed, 37 insertions(+), 29 deletions(-) diff --git a/src/web/routes/registration.py b/src/web/routes/registration.py index ad748d7..65e16a0 100644 --- a/src/web/routes/registration.py +++ b/src/web/routes/registration.py @@ -176,7 +176,7 @@ def task_to_response(task: RegistrationTask) -> RegistrationTaskResponse: ) -def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = ""): +def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = "", batch_id: str = ""): """ 在线程池中执行的同步注册任务 @@ -314,7 +314,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: email_service = EmailServiceFactory.create(service_type, config) # 创建注册引擎 - 使用 TaskManager 的日志回调 - log_callback = task_manager.create_log_callback(task_uuid, prefix=log_prefix) + log_callback = task_manager.create_log_callback(task_uuid, prefix=log_prefix, batch_id=batch_id) engine = RegistrationEngine( email_service=email_service, @@ -377,7 +377,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: pass -async def run_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = ""): +async def run_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = "", batch_id: str = ""): """ 异步执行注册任务 @@ -393,7 +393,7 @@ async def run_registration_task(task_uuid: str, email_service_type: str, proxy: task_manager.add_log(task_uuid, f"{log_prefix} [系统] 任务 {task_uuid[:8]} 已加入队列" if log_prefix else f"[系统] 任务 {task_uuid[:8]} 已加入队列") try: - # 在线程池中执行同步任务(传入 log_prefix 供回调使用) + # 在线程池中执行同步任务(传入 log_prefix 和 batch_id 供回调使用) await loop.run_in_executor( task_manager.executor, _run_sync_registration_task, @@ -402,7 +402,8 @@ async def run_registration_task(task_uuid: str, email_service_type: str, proxy: proxy, email_service_config, email_service_id, - log_prefix + log_prefix, + batch_id ) except Exception as e: logger.error(f"线程池执行异常: {task_uuid}, 错误: {e}") @@ -456,6 +457,7 @@ async def run_batch_parallel( _init_batch_state(batch_id, task_uuids) add_batch_log, update_batch_status = _make_batch_helpers(batch_id) semaphore = asyncio.Semaphore(concurrency) + counter_lock = asyncio.Lock() add_batch_log(f"[系统] 并行模式启动,并发数: {concurrency},总任务: {len(task_uuids)}") async def _run_one(idx: int, uuid: str): @@ -463,21 +465,22 @@ async def run_batch_parallel( async with semaphore: await run_registration_task( uuid, email_service_type, proxy, email_service_config, email_service_id, - log_prefix=prefix + log_prefix=prefix, batch_id=batch_id ) with get_db() as db: t = crud.get_registration_task(db, uuid) if t: - new_completed = batch_tasks[batch_id]["completed"] + 1 - new_success = batch_tasks[batch_id]["success"] - new_failed = batch_tasks[batch_id]["failed"] - if t.status == "completed": - new_success += 1 - add_batch_log(f"{prefix} [成功] 注册成功") - elif t.status == "failed": - new_failed += 1 - add_batch_log(f"{prefix} [失败] 注册失败: {t.error_message}") - update_batch_status(completed=new_completed, success=new_success, failed=new_failed) + async with counter_lock: + new_completed = batch_tasks[batch_id]["completed"] + 1 + new_success = batch_tasks[batch_id]["success"] + new_failed = batch_tasks[batch_id]["failed"] + if t.status == "completed": + new_success += 1 + add_batch_log(f"{prefix} [成功] 注册成功") + elif t.status == "failed": + new_failed += 1 + add_batch_log(f"{prefix} [失败] 注册失败: {t.error_message}") + update_batch_status(completed=new_completed, success=new_success, failed=new_failed) try: await asyncio.gather(*[_run_one(i, u) for i, u in enumerate(task_uuids)], return_exceptions=True) @@ -511,6 +514,7 @@ async def run_batch_pipeline( _init_batch_state(batch_id, task_uuids) add_batch_log, update_batch_status = _make_batch_helpers(batch_id) semaphore = asyncio.Semaphore(concurrency) + counter_lock = asyncio.Lock() running_tasks_list = [] add_batch_log(f"[系统] 流水线模式启动,并发数: {concurrency},总任务: {len(task_uuids)}") @@ -518,21 +522,22 @@ async def run_batch_pipeline( try: await run_registration_task( uuid, email_service_type, proxy, email_service_config, email_service_id, - log_prefix=pfx + log_prefix=pfx, batch_id=batch_id ) with get_db() as db: t = crud.get_registration_task(db, uuid) if t: - new_completed = batch_tasks[batch_id]["completed"] + 1 - new_success = batch_tasks[batch_id]["success"] - new_failed = batch_tasks[batch_id]["failed"] - if t.status == "completed": - new_success += 1 - add_batch_log(f"{pfx} [成功] 注册成功") - elif t.status == "failed": - new_failed += 1 - add_batch_log(f"{pfx} [失败] 注册失败: {t.error_message}") - update_batch_status(completed=new_completed, success=new_success, failed=new_failed) + async with counter_lock: + new_completed = batch_tasks[batch_id]["completed"] + 1 + new_success = batch_tasks[batch_id]["success"] + new_failed = batch_tasks[batch_id]["failed"] + if t.status == "completed": + new_success += 1 + add_batch_log(f"{pfx} [成功] 注册成功") + elif t.status == "failed": + new_failed += 1 + add_batch_log(f"{pfx} [失败] 注册失败: {t.error_message}") + update_batch_status(completed=new_completed, success=new_success, failed=new_failed) finally: semaphore.release() diff --git a/src/web/task_manager.py b/src/web/task_manager.py index c82dcbf..b9e31b9 100644 --- a/src/web/task_manager.py +++ b/src/web/task_manager.py @@ -344,11 +344,14 @@ class TaskManager: _ws_sent_index[key].pop(id(websocket), None) logger.info(f"批量任务 WebSocket 连接已注销: {batch_id}") - def create_log_callback(self, task_uuid: str, prefix: str = "") -> Callable[[str], None]: - """创建日志回调函数,可附加任务编号前缀""" + def create_log_callback(self, task_uuid: str, prefix: str = "", batch_id: str = "") -> Callable[[str], None]: + """创建日志回调函数,可附加任务编号前缀,并同时推送到批量任务频道""" def callback(msg: str): full_msg = f"{prefix} {msg}" if prefix else msg self.add_log(task_uuid, full_msg) + # 如果属于批量任务,同步推送到 batch 频道,前端可在混合日志中看到详细步骤 + if batch_id: + self.add_batch_log(batch_id, full_msg) return callback def create_check_cancelled_callback(self, task_uuid: str) -> Callable[[], bool]: