From 4e5c53f627fecbe0dd35d7c22377ba864af20658 Mon Sep 17 00:00:00 2001 From: cnlimiter Date: Mon, 16 Mar 2026 10:12:40 +0800 Subject: [PATCH] =?UTF-8?q?feat(ui):=20=E4=B8=BA=E6=89=B9=E9=87=8F?= =?UTF-8?q?=E6=B3=A8=E5=86=8C=E6=B7=BB=E5=8A=A0=E5=B9=B6=E5=8F=91=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E9=80=89=E6=8B=A9=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在前端界面添加并发模式(流水线/并行)和并发数配置 - 后端支持并发执行批量任务,线程池扩展至50个并发 - 优化批量任务日志显示,添加任务编号前缀 --- src/web/routes/registration.py | 480 ++++++++++++++++----------------- src/web/task_manager.py | 11 +- static/js/app.js | 41 ++- templates/index.html | 46 +++- 4 files changed, 320 insertions(+), 258 deletions(-) diff --git a/src/web/routes/registration.py b/src/web/routes/registration.py index 6484e41..ad748d7 100644 --- a/src/web/routes/registration.py +++ b/src/web/routes/registration.py @@ -82,6 +82,8 @@ class BatchRegistrationRequest(BaseModel): email_service_id: Optional[int] = None # 使用数据库中已配置的邮箱服务 ID interval_min: int = 5 # 最小间隔秒数 interval_max: int = 30 # 最大间隔秒数 + concurrency: int = 1 # 并发线程数 (1-50) + mode: str = "pipeline" # 执行模式: "parallel" 或 "pipeline" class RegistrationTaskResponse(BaseModel): @@ -142,6 +144,8 @@ class OutlookBatchRegistrationRequest(BaseModel): proxy: Optional[str] = None interval_min: int = 5 interval_max: int = 30 + concurrency: int = 1 # 并发线程数 (1-50) + mode: str = "pipeline" # 执行模式: "parallel" 或 "pipeline" class OutlookBatchRegistrationResponse(BaseModel): @@ -172,7 +176,7 @@ def task_to_response(task: RegistrationTask) -> RegistrationTaskResponse: ) -def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None): +def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = ""): """ 在线程池中执行的同步注册任务 @@ -310,7 +314,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: email_service = EmailServiceFactory.create(service_type, config) # 创建注册引擎 - 使用 TaskManager 的日志回调 - log_callback = task_manager.create_log_callback(task_uuid) + log_callback = task_manager.create_log_callback(task_uuid, prefix=log_prefix) engine = RegistrationEngine( email_service=email_service, @@ -373,7 +377,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: pass -async def run_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None): +async def run_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = ""): """ 异步执行注册任务 @@ -386,10 +390,10 @@ async def run_registration_task(task_uuid: str, email_service_type: str, proxy: # 初始化 TaskManager 状态 task_manager.update_status(task_uuid, "pending") - task_manager.add_log(task_uuid, f"[系统] 任务 {task_uuid[:8]} 已加入队列") + task_manager.add_log(task_uuid, f"{log_prefix} [系统] 任务 {task_uuid[:8]} 已加入队列" if log_prefix else f"[系统] 任务 {task_uuid[:8]} 已加入队列") try: - # 在线程池中执行同步任务 + # 在线程池中执行同步任务(传入 log_prefix 供回调使用) await loop.run_in_executor( task_manager.executor, _run_sync_registration_task, @@ -397,7 +401,8 @@ async def run_registration_task(task_uuid: str, email_service_type: str, proxy: email_service_type, proxy, email_service_config, - email_service_id + email_service_id, + log_prefix ) except Exception as e: logger.error(f"线程池执行异常: {task_uuid}, 错误: {e}") @@ -405,6 +410,168 @@ async def run_registration_task(task_uuid: str, email_service_type: str, proxy: task_manager.update_status(task_uuid, "failed", error=str(e)) +def _init_batch_state(batch_id: str, task_uuids: List[str]): + """初始化批量任务内存状态""" + task_manager.init_batch(batch_id, len(task_uuids)) + batch_tasks[batch_id] = { + "total": len(task_uuids), + "completed": 0, + "success": 0, + "failed": 0, + "cancelled": False, + "task_uuids": task_uuids, + "current_index": 0, + "logs": [], + "finished": False + } + + +def _make_batch_helpers(batch_id: str): + """返回 add_batch_log 和 update_batch_status 辅助函数""" + def add_batch_log(msg: str): + batch_tasks[batch_id]["logs"].append(msg) + task_manager.add_batch_log(batch_id, msg) + + def update_batch_status(**kwargs): + for key, value in kwargs.items(): + if key in batch_tasks[batch_id]: + batch_tasks[batch_id][key] = value + task_manager.update_batch_status(batch_id, **kwargs) + + return add_batch_log, update_batch_status + + +async def run_batch_parallel( + batch_id: str, + task_uuids: List[str], + email_service_type: str, + proxy: Optional[str], + email_service_config: Optional[dict], + email_service_id: Optional[int], + concurrency: int +): + """ + 并行模式:所有任务同时提交,Semaphore 控制最大并发数 + """ + _init_batch_state(batch_id, task_uuids) + add_batch_log, update_batch_status = _make_batch_helpers(batch_id) + semaphore = asyncio.Semaphore(concurrency) + add_batch_log(f"[系统] 并行模式启动,并发数: {concurrency},总任务: {len(task_uuids)}") + + async def _run_one(idx: int, uuid: str): + prefix = f"[任务{idx + 1}]" + async with semaphore: + await run_registration_task( + uuid, email_service_type, proxy, email_service_config, email_service_id, + log_prefix=prefix + ) + with get_db() as db: + t = crud.get_registration_task(db, uuid) + if t: + new_completed = batch_tasks[batch_id]["completed"] + 1 + new_success = batch_tasks[batch_id]["success"] + new_failed = batch_tasks[batch_id]["failed"] + if t.status == "completed": + new_success += 1 + add_batch_log(f"{prefix} [成功] 注册成功") + elif t.status == "failed": + new_failed += 1 + add_batch_log(f"{prefix} [失败] 注册失败: {t.error_message}") + update_batch_status(completed=new_completed, success=new_success, failed=new_failed) + + try: + await asyncio.gather(*[_run_one(i, u) for i, u in enumerate(task_uuids)], return_exceptions=True) + if not task_manager.is_batch_cancelled(batch_id): + add_batch_log(f"[完成] 批量任务完成!成功: {batch_tasks[batch_id]['success']}, 失败: {batch_tasks[batch_id]['failed']}") + update_batch_status(finished=True, status="completed") + else: + update_batch_status(finished=True, status="cancelled") + except Exception as e: + logger.error(f"批量任务 {batch_id} 异常: {e}") + add_batch_log(f"[错误] 批量任务异常: {str(e)}") + update_batch_status(finished=True, status="failed") + finally: + batch_tasks[batch_id]["finished"] = True + + +async def run_batch_pipeline( + batch_id: str, + task_uuids: List[str], + email_service_type: str, + proxy: Optional[str], + email_service_config: Optional[dict], + email_service_id: Optional[int], + interval_min: int, + interval_max: int, + concurrency: int +): + """ + 流水线模式:每隔 interval 秒启动一个新任务,Semaphore 限制最大并发数 + """ + _init_batch_state(batch_id, task_uuids) + add_batch_log, update_batch_status = _make_batch_helpers(batch_id) + semaphore = asyncio.Semaphore(concurrency) + running_tasks_list = [] + add_batch_log(f"[系统] 流水线模式启动,并发数: {concurrency},总任务: {len(task_uuids)}") + + async def _run_and_release(idx: int, uuid: str, pfx: str): + try: + await run_registration_task( + uuid, email_service_type, proxy, email_service_config, email_service_id, + log_prefix=pfx + ) + with get_db() as db: + t = crud.get_registration_task(db, uuid) + if t: + new_completed = batch_tasks[batch_id]["completed"] + 1 + new_success = batch_tasks[batch_id]["success"] + new_failed = batch_tasks[batch_id]["failed"] + if t.status == "completed": + new_success += 1 + add_batch_log(f"{pfx} [成功] 注册成功") + elif t.status == "failed": + new_failed += 1 + add_batch_log(f"{pfx} [失败] 注册失败: {t.error_message}") + update_batch_status(completed=new_completed, success=new_success, failed=new_failed) + finally: + semaphore.release() + + try: + for i, task_uuid in enumerate(task_uuids): + if task_manager.is_batch_cancelled(batch_id) or batch_tasks[batch_id]["cancelled"]: + with get_db() as db: + for remaining_uuid in task_uuids[i:]: + crud.update_registration_task(db, remaining_uuid, status="cancelled") + add_batch_log("[取消] 批量任务已取消") + update_batch_status(finished=True, status="cancelled") + break + + update_batch_status(current_index=i) + await semaphore.acquire() + prefix = f"[任务{i + 1}]" + add_batch_log(f"{prefix} 开始注册...") + t = asyncio.create_task(_run_and_release(i, task_uuid, prefix)) + running_tasks_list.append(t) + + if i < len(task_uuids) - 1 and not task_manager.is_batch_cancelled(batch_id): + wait_time = random.randint(interval_min, interval_max) + logger.info(f"批量任务 {batch_id}: 等待 {wait_time} 秒后启动下一个任务") + await asyncio.sleep(wait_time) + + if running_tasks_list: + await asyncio.gather(*running_tasks_list, return_exceptions=True) + + if not task_manager.is_batch_cancelled(batch_id): + add_batch_log(f"[完成] 批量任务完成!成功: {batch_tasks[batch_id]['success']}, 失败: {batch_tasks[batch_id]['failed']}") + update_batch_status(finished=True, status="completed") + except Exception as e: + logger.error(f"批量任务 {batch_id} 异常: {e}") + add_batch_log(f"[错误] 批量任务异常: {str(e)}") + update_batch_status(finished=True, status="failed") + finally: + batch_tasks[batch_id]["finished"] = True + + async def run_batch_registration( batch_id: str, task_uuids: List[str], @@ -413,95 +580,22 @@ async def run_batch_registration( email_service_config: Optional[dict], email_service_id: Optional[int], interval_min: int, - interval_max: int + interval_max: int, + concurrency: int = 1, + mode: str = "pipeline" ): - """ - 异步执行批量注册任务 - - 使用线程池执行每个注册任务,避免阻塞主事件循环 - """ - # 初始化 TaskManager 批量任务(支持 WebSocket 推送) - task_manager.init_batch(batch_id, len(task_uuids)) - - batch_tasks[batch_id] = { - "total": len(task_uuids), - "completed": 0, - "success": 0, - "failed": 0, - "cancelled": False, - "task_uuids": task_uuids, - "current_index": 0 - } - - def add_batch_log(msg: str): - batch_tasks[batch_id]["logs"] = batch_tasks[batch_id].get("logs", []) - batch_tasks[batch_id]["logs"].append(msg) - task_manager.add_batch_log(batch_id, msg) - - def update_batch_status(**kwargs): - for key, value in kwargs.items(): - if key in batch_tasks[batch_id]: - batch_tasks[batch_id][key] = value - task_manager.update_batch_status(batch_id, **kwargs) - - try: - for i, task_uuid in enumerate(task_uuids): - # 检查是否已取消 - if task_manager.is_batch_cancelled(batch_id) or batch_tasks[batch_id]["cancelled"]: - # 取消剩余任务 - with get_db() as db: - for remaining_uuid in task_uuids[i:]: - crud.update_registration_task(db, remaining_uuid, status="cancelled") - add_batch_log(f"[取消] 批量任务已取消") - update_batch_status(finished=True, status="cancelled") - logger.info(f"批量任务 {batch_id} 已取消") - break - - update_batch_status(current_index=i) - - # 运行单个注册任务(使用线程池) - await run_registration_task( - task_uuid, email_service_type, proxy, email_service_config, email_service_id - ) - - # 更新统计 - with get_db() as db: - task = crud.get_registration_task(db, task_uuid) - if task: - new_completed = batch_tasks[batch_id]["completed"] + 1 - new_success = batch_tasks[batch_id]["success"] - new_failed = batch_tasks[batch_id]["failed"] - - if task.status == "completed": - new_success += 1 - add_batch_log(f"[成功] 第 {new_success} 个账号注册成功") - elif task.status == "failed": - new_failed += 1 - add_batch_log(f"[失败] 第 {new_failed} 个账号注册失败: {task.error_message}") - - update_batch_status( - completed=new_completed, - success=new_success, - failed=new_failed - ) - - # 如果不是最后一个任务,等待随机间隔 - if i < len(task_uuids) - 1 and not task_manager.is_batch_cancelled(batch_id): - wait_time = random.randint(interval_min, interval_max) - logger.info(f"批量任务 {batch_id}: 等待 {wait_time} 秒后继续下一个任务") - await asyncio.sleep(wait_time) - - if not task_manager.is_batch_cancelled(batch_id): - add_batch_log(f"[完成] 批量任务完成!成功: {batch_tasks[batch_id]['success']}, 失败: {batch_tasks[batch_id]['failed']}") - update_batch_status(finished=True, status="completed") - logger.info(f"批量任务 {batch_id} 完成: 成功 {batch_tasks[batch_id]['success']}, 失败 {batch_tasks[batch_id]['failed']}") - - except Exception as e: - logger.error(f"批量任务 {batch_id} 异常: {e}") - add_batch_log(f"[错误] 批量任务异常: {str(e)}") - update_batch_status(finished=True, status="failed") - finally: - batch_tasks[batch_id]["finished"] = True + """根据 mode 分发到并行或流水线执行""" + if mode == "parallel": + await run_batch_parallel( + batch_id, task_uuids, email_service_type, proxy, + email_service_config, email_service_id, concurrency + ) + else: + await run_batch_pipeline( + batch_id, task_uuids, email_service_type, proxy, + email_service_config, email_service_id, + interval_min, interval_max, concurrency + ) # ============== API Endpoints ============== @@ -579,6 +673,12 @@ async def start_batch_registration( if request.interval_min < 0 or request.interval_max < request.interval_min: raise HTTPException(status_code=400, detail="间隔时间参数无效") + if not 1 <= request.concurrency <= 50: + raise HTTPException(status_code=400, detail="并发数必须在 1-50 之间") + + if request.mode not in ("parallel", "pipeline"): + raise HTTPException(status_code=400, detail="模式必须为 parallel 或 pipeline") + # 创建批量任务 batch_id = str(uuid.uuid4()) task_uuids = [] @@ -607,7 +707,9 @@ async def start_batch_registration( request.email_service_config, request.email_service_id, request.interval_min, - request.interval_max + request.interval_max, + request.concurrency, + request.mode ) return BatchRegistrationResponse( @@ -903,168 +1005,52 @@ async def get_outlook_accounts_for_registration(): ) -def _run_sync_outlook_batch_registration( - batch_id: str, - service_ids: List[int], - skip_registered: bool, - proxy: Optional[str], - interval_min: int, - interval_max: int -): - """ - 在线程池中执行的同步 Outlook 批量注册任务 - """ - from ...database.models import EmailService as EmailServiceModel - from ...database.models import Account - - # 初始化 TaskManager 批量任务 - task_manager.init_batch(batch_id, len(service_ids)) - - # 兼容旧版 batch_tasks(用于 REST API 轮询降级) - batch_tasks[batch_id] = { - "total": len(service_ids), - "completed": 0, - "success": 0, - "failed": 0, - "skipped": 0, - "cancelled": False, - "service_ids": service_ids, - "current_index": 0, - "logs": [] - } - - def add_batch_log(msg: str): - """同时添加日志到两个系统""" - batch_tasks[batch_id]["logs"].append(msg) - task_manager.add_batch_log(batch_id, msg) - - def update_batch_status(**kwargs): - """同时更新两个系统的状态""" - for key, value in kwargs.items(): - if key in batch_tasks[batch_id]: - batch_tasks[batch_id][key] = value - task_manager.update_batch_status(batch_id, **kwargs) - - try: - for i, service_id in enumerate(service_ids): - # 检查是否已取消 - if task_manager.is_batch_cancelled(batch_id): - add_batch_log(f"[取消] 批量任务已取消") - update_batch_status(finished=True, status="cancelled") - logger.info(f"Outlook 批量任务 {batch_id} 已取消") - break - - update_batch_status(current_index=i) - - with get_db() as db: - # 获取邮箱服务 - service = db.query(EmailServiceModel).filter( - EmailServiceModel.id == service_id - ).first() - - if not service: - add_batch_log(f"[跳过] 服务 ID {service_id} 不存在") - update_batch_status(skipped=batch_tasks[batch_id]["skipped"] + 1, - completed=batch_tasks[batch_id]["completed"] + 1) - continue - - config = service.config or {} - email = config.get("email") or service.name - - # 检查是否已注册 - if skip_registered: - existing_account = db.query(Account).filter( - Account.email == email - ).first() - - if existing_account: - add_batch_log(f"[跳过] {email} 已注册 (账号 ID: {existing_account.id})") - update_batch_status(skipped=batch_tasks[batch_id]["skipped"] + 1, - completed=batch_tasks[batch_id]["completed"] + 1) - continue - - # 创建注册任务 - task_uuid = str(uuid.uuid4()) - task = crud.create_registration_task( - db, - task_uuid=task_uuid, - proxy=proxy, - email_service_id=service_id - ) - - add_batch_log(f"[注册] 开始注册 {email}...") - - # 同步执行注册任务 - _run_sync_registration_task(task_uuid, "outlook", proxy, None, service_id) - - # 更新统计 - with get_db() as db: - task = crud.get_registration_task(db, task_uuid) - if task: - new_completed = batch_tasks[batch_id]["completed"] + 1 - new_success = batch_tasks[batch_id]["success"] - new_failed = batch_tasks[batch_id]["failed"] - - if task.status == "completed": - new_success += 1 - add_batch_log(f"[成功] {email} 注册成功") - elif task.status == "failed": - new_failed += 1 - add_batch_log(f"[失败] {email} 注册失败: {task.error_message}") - - update_batch_status( - completed=new_completed, - success=new_success, - failed=new_failed - ) - - # 如果不是最后一个任务,等待随机间隔 - if i < len(service_ids) - 1 and not task_manager.is_batch_cancelled(batch_id): - wait_time = random.randint(interval_min, interval_max) - logger.info(f"Outlook 批量任务 {batch_id}: 等待 {wait_time} 秒后继续下一个任务") - import time - time.sleep(wait_time) - - # 完成批量任务 - if not task_manager.is_batch_cancelled(batch_id): - add_batch_log(f"[完成] 批量任务完成!成功: {batch_tasks[batch_id]['success']}, 失败: {batch_tasks[batch_id]['failed']}, 跳过: {batch_tasks[batch_id]['skipped']}") - update_batch_status(finished=True, status="completed") - logger.info(f"Outlook 批量任务 {batch_id} 完成: 成功 {batch_tasks[batch_id]['success']}, 失败 {batch_tasks[batch_id]['failed']}, 跳过 {batch_tasks[batch_id]['skipped']}") - - except Exception as e: - logger.error(f"Outlook 批量任务 {batch_id} 异常: {e}") - add_batch_log(f"[错误] 批量任务异常: {str(e)}") - update_batch_status(finished=True, status="failed") - - async def run_outlook_batch_registration( batch_id: str, service_ids: List[int], skip_registered: bool, proxy: Optional[str], interval_min: int, - interval_max: int + interval_max: int, + concurrency: int = 1, + mode: str = "pipeline" ): """ - 异步执行 Outlook 批量注册任务 + 异步执行 Outlook 批量注册任务,复用通用并发逻辑 - 使用线程池执行,避免阻塞主事件循环 + 将每个 service_id 映射为一个独立的 task_uuid,然后调用 + run_batch_registration 的并发逻辑 """ loop = task_manager.get_loop() if loop is None: loop = asyncio.get_event_loop() task_manager.set_loop(loop) - # 在线程池中执行 - await loop.run_in_executor( - task_manager.executor, - _run_sync_outlook_batch_registration, - batch_id, - service_ids, - skip_registered, - proxy, - interval_min, - interval_max + # 预先为每个 service_id 创建注册任务记录 + task_uuids = [] + with get_db() as db: + for service_id in service_ids: + task_uuid = str(uuid.uuid4()) + crud.create_registration_task( + db, + task_uuid=task_uuid, + proxy=proxy, + email_service_id=service_id + ) + task_uuids.append(task_uuid) + + # 复用通用并发逻辑(outlook 服务类型,每个任务通过 email_service_id 定位账户) + await run_batch_registration( + batch_id=batch_id, + task_uuids=task_uuids, + email_service_type="outlook", + proxy=proxy, + email_service_config=None, + email_service_id=None, # 每个任务已绑定了独立的 email_service_id + interval_min=interval_min, + interval_max=interval_max, + concurrency=concurrency, + mode=mode ) @@ -1092,6 +1078,12 @@ async def start_outlook_batch_registration( if request.interval_min < 0 or request.interval_max < request.interval_min: raise HTTPException(status_code=400, detail="间隔时间参数无效") + if not 1 <= request.concurrency <= 50: + raise HTTPException(status_code=400, detail="并发数必须在 1-50 之间") + + if request.mode not in ("parallel", "pipeline"): + raise HTTPException(status_code=400, detail="模式必须为 parallel 或 pipeline") + # 过滤掉已注册的邮箱 actual_service_ids = request.service_ids skipped_count = 0 @@ -1154,7 +1146,9 @@ async def start_outlook_batch_registration( request.skip_registered, request.proxy, request.interval_min, - request.interval_max + request.interval_max, + request.concurrency, + request.mode ) return OutlookBatchRegistrationResponse( diff --git a/src/web/task_manager.py b/src/web/task_manager.py index 8079600..c82dcbf 100644 --- a/src/web/task_manager.py +++ b/src/web/task_manager.py @@ -13,8 +13,8 @@ from datetime import datetime logger = logging.getLogger(__name__) -# 全局线程池 -_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="reg_worker") +# 全局线程池(支持最多 50 个并发注册任务) +_executor = ThreadPoolExecutor(max_workers=50, thread_name_prefix="reg_worker") # 任务日志队列 (task_uuid -> list of logs) _log_queues: Dict[str, List[str]] = defaultdict(list) @@ -344,10 +344,11 @@ class TaskManager: _ws_sent_index[key].pop(id(websocket), None) logger.info(f"批量任务 WebSocket 连接已注销: {batch_id}") - def create_log_callback(self, task_uuid: str) -> Callable[[str], None]: - """创建日志回调函数""" + def create_log_callback(self, task_uuid: str, prefix: str = "") -> Callable[[str], None]: + """创建日志回调函数,可附加任务编号前缀""" def callback(msg: str): - self.add_log(task_uuid, msg) + full_msg = f"{prefix} {msg}" if prefix else msg + self.add_log(task_uuid, full_msg) return callback def create_check_cancelled_callback(self, task_uuid: str) -> Callable[[], bool]: diff --git a/static/js/app.js b/static/js/app.js index f0edef6..3407ed0 100644 --- a/static/js/app.js +++ b/static/js/app.js @@ -71,7 +71,16 @@ const elements = { outlookAccountsContainer: document.getElementById('outlook-accounts-container'), outlookIntervalMin: document.getElementById('outlook-interval-min'), outlookIntervalMax: document.getElementById('outlook-interval-max'), - outlookSkipRegistered: document.getElementById('outlook-skip-registered') + outlookSkipRegistered: document.getElementById('outlook-skip-registered'), + outlookConcurrencyMode: document.getElementById('outlook-concurrency-mode'), + outlookConcurrencyCount: document.getElementById('outlook-concurrency-count'), + outlookConcurrencyHint: document.getElementById('outlook-concurrency-hint'), + outlookIntervalGroup: document.getElementById('outlook-interval-group'), + // 批量并发控件 + concurrencyMode: document.getElementById('concurrency-mode'), + concurrencyCount: document.getElementById('concurrency-count'), + concurrencyHint: document.getElementById('concurrency-hint'), + intervalGroup: document.getElementById('interval-group') }; // 初始化 @@ -109,6 +118,14 @@ function initEventListeners() { loadRecentAccounts(); toast.info('已刷新'); }); + + // 并发模式切换 + elements.concurrencyMode.addEventListener('change', () => { + handleConcurrencyModeChange(elements.concurrencyMode, elements.concurrencyHint, elements.intervalGroup); + }); + elements.outlookConcurrencyMode.addEventListener('change', () => { + handleConcurrencyModeChange(elements.outlookConcurrencyMode, elements.outlookConcurrencyHint, elements.outlookIntervalGroup); + }); } // 加载可用的邮箱服务 @@ -261,6 +278,18 @@ function handleModeChange(e) { elements.batchOptions.style.display = isBatchMode ? 'block' : 'none'; } +// 并发模式切换(批量) +function handleConcurrencyModeChange(selectEl, hintEl, intervalGroupEl) { + const mode = selectEl.value; + if (mode === 'parallel') { + hintEl.textContent = '所有任务分成 N 个并发批次同时执行'; + intervalGroupEl.style.display = 'none'; + } else { + hintEl.textContent = '同时最多运行 N 个任务,每隔 interval 秒启动新任务'; + intervalGroupEl.style.display = 'block'; + } +} + // 开始注册 async function handleStartRegistration(e) { e.preventDefault(); @@ -472,10 +501,14 @@ async function handleBatchRegistration(requestData) { const count = parseInt(elements.batchCount.value) || 5; const intervalMin = parseInt(elements.intervalMin.value) || 5; const intervalMax = parseInt(elements.intervalMax.value) || 30; + const concurrency = parseInt(elements.concurrencyCount.value) || 3; + const mode = elements.concurrencyMode.value || 'pipeline'; requestData.count = count; requestData.interval_min = intervalMin; requestData.interval_max = intervalMax; + requestData.concurrency = Math.min(50, Math.max(1, concurrency)); + requestData.mode = mode; addLog('info', `[系统] 正在启动批量注册任务 (数量: ${count})...`); @@ -966,6 +999,8 @@ async function handleOutlookBatchRegistration() { const intervalMin = parseInt(elements.outlookIntervalMin.value) || 5; const intervalMax = parseInt(elements.outlookIntervalMax.value) || 30; const skipRegistered = elements.outlookSkipRegistered.checked; + const concurrency = parseInt(elements.outlookConcurrencyCount.value) || 3; + const mode = elements.outlookConcurrencyMode.value || 'pipeline'; // 禁用开始按钮 elements.startBtn.disabled = true; @@ -978,7 +1013,9 @@ async function handleOutlookBatchRegistration() { service_ids: selectedIds, skip_registered: skipRegistered, interval_min: intervalMin, - interval_max: intervalMax + interval_max: intervalMax, + concurrency: Math.min(50, Math.max(1, concurrency)), + mode: mode }; addLog('info', `[系统] 正在启动 Outlook 批量注册 (${selectedIds.length} 个账户)...`); diff --git a/templates/index.html b/templates/index.html index 4b6760b..61bea89 100644 --- a/templates/index.html +++ b/templates/index.html @@ -150,12 +150,26 @@
- - + +
- - + + + 同时最多运行 N 个任务,每隔 interval 秒启动新任务 +
+
+
+ + +
+
+ + +