mirror of
https://github.com/cnlimiter/codex-register.git
synced 2026-05-06 20:02:51 +08:00
feat(ui): 为批量注册添加并发模式选择功能
- 在前端界面添加并发模式(流水线/并行)和并发数配置 - 后端支持并发执行批量任务,线程池扩展至50个并发 - 优化批量任务日志显示,添加任务编号前缀
This commit is contained in:
@@ -82,6 +82,8 @@ class BatchRegistrationRequest(BaseModel):
|
||||
email_service_id: Optional[int] = None # 使用数据库中已配置的邮箱服务 ID
|
||||
interval_min: int = 5 # 最小间隔秒数
|
||||
interval_max: int = 30 # 最大间隔秒数
|
||||
concurrency: int = 1 # 并发线程数 (1-50)
|
||||
mode: str = "pipeline" # 执行模式: "parallel" 或 "pipeline"
|
||||
|
||||
|
||||
class RegistrationTaskResponse(BaseModel):
|
||||
@@ -142,6 +144,8 @@ class OutlookBatchRegistrationRequest(BaseModel):
|
||||
proxy: Optional[str] = None
|
||||
interval_min: int = 5
|
||||
interval_max: int = 30
|
||||
concurrency: int = 1 # 并发线程数 (1-50)
|
||||
mode: str = "pipeline" # 执行模式: "parallel" 或 "pipeline"
|
||||
|
||||
|
||||
class OutlookBatchRegistrationResponse(BaseModel):
|
||||
@@ -172,7 +176,7 @@ def task_to_response(task: RegistrationTask) -> RegistrationTaskResponse:
|
||||
)
|
||||
|
||||
|
||||
def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None):
|
||||
def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = ""):
|
||||
"""
|
||||
在线程池中执行的同步注册任务
|
||||
|
||||
@@ -310,7 +314,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
|
||||
email_service = EmailServiceFactory.create(service_type, config)
|
||||
|
||||
# 创建注册引擎 - 使用 TaskManager 的日志回调
|
||||
log_callback = task_manager.create_log_callback(task_uuid)
|
||||
log_callback = task_manager.create_log_callback(task_uuid, prefix=log_prefix)
|
||||
|
||||
engine = RegistrationEngine(
|
||||
email_service=email_service,
|
||||
@@ -373,7 +377,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
|
||||
pass
|
||||
|
||||
|
||||
async def run_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None):
|
||||
async def run_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = ""):
|
||||
"""
|
||||
异步执行注册任务
|
||||
|
||||
@@ -386,10 +390,10 @@ async def run_registration_task(task_uuid: str, email_service_type: str, proxy:
|
||||
|
||||
# 初始化 TaskManager 状态
|
||||
task_manager.update_status(task_uuid, "pending")
|
||||
task_manager.add_log(task_uuid, f"[系统] 任务 {task_uuid[:8]} 已加入队列")
|
||||
task_manager.add_log(task_uuid, f"{log_prefix} [系统] 任务 {task_uuid[:8]} 已加入队列" if log_prefix else f"[系统] 任务 {task_uuid[:8]} 已加入队列")
|
||||
|
||||
try:
|
||||
# 在线程池中执行同步任务
|
||||
# 在线程池中执行同步任务(传入 log_prefix 供回调使用)
|
||||
await loop.run_in_executor(
|
||||
task_manager.executor,
|
||||
_run_sync_registration_task,
|
||||
@@ -397,7 +401,8 @@ async def run_registration_task(task_uuid: str, email_service_type: str, proxy:
|
||||
email_service_type,
|
||||
proxy,
|
||||
email_service_config,
|
||||
email_service_id
|
||||
email_service_id,
|
||||
log_prefix
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"线程池执行异常: {task_uuid}, 错误: {e}")
|
||||
@@ -405,6 +410,168 @@ async def run_registration_task(task_uuid: str, email_service_type: str, proxy:
|
||||
task_manager.update_status(task_uuid, "failed", error=str(e))
|
||||
|
||||
|
||||
def _init_batch_state(batch_id: str, task_uuids: List[str]):
|
||||
"""初始化批量任务内存状态"""
|
||||
task_manager.init_batch(batch_id, len(task_uuids))
|
||||
batch_tasks[batch_id] = {
|
||||
"total": len(task_uuids),
|
||||
"completed": 0,
|
||||
"success": 0,
|
||||
"failed": 0,
|
||||
"cancelled": False,
|
||||
"task_uuids": task_uuids,
|
||||
"current_index": 0,
|
||||
"logs": [],
|
||||
"finished": False
|
||||
}
|
||||
|
||||
|
||||
def _make_batch_helpers(batch_id: str):
|
||||
"""返回 add_batch_log 和 update_batch_status 辅助函数"""
|
||||
def add_batch_log(msg: str):
|
||||
batch_tasks[batch_id]["logs"].append(msg)
|
||||
task_manager.add_batch_log(batch_id, msg)
|
||||
|
||||
def update_batch_status(**kwargs):
|
||||
for key, value in kwargs.items():
|
||||
if key in batch_tasks[batch_id]:
|
||||
batch_tasks[batch_id][key] = value
|
||||
task_manager.update_batch_status(batch_id, **kwargs)
|
||||
|
||||
return add_batch_log, update_batch_status
|
||||
|
||||
|
||||
async def run_batch_parallel(
|
||||
batch_id: str,
|
||||
task_uuids: List[str],
|
||||
email_service_type: str,
|
||||
proxy: Optional[str],
|
||||
email_service_config: Optional[dict],
|
||||
email_service_id: Optional[int],
|
||||
concurrency: int
|
||||
):
|
||||
"""
|
||||
并行模式:所有任务同时提交,Semaphore 控制最大并发数
|
||||
"""
|
||||
_init_batch_state(batch_id, task_uuids)
|
||||
add_batch_log, update_batch_status = _make_batch_helpers(batch_id)
|
||||
semaphore = asyncio.Semaphore(concurrency)
|
||||
add_batch_log(f"[系统] 并行模式启动,并发数: {concurrency},总任务: {len(task_uuids)}")
|
||||
|
||||
async def _run_one(idx: int, uuid: str):
|
||||
prefix = f"[任务{idx + 1}]"
|
||||
async with semaphore:
|
||||
await run_registration_task(
|
||||
uuid, email_service_type, proxy, email_service_config, email_service_id,
|
||||
log_prefix=prefix
|
||||
)
|
||||
with get_db() as db:
|
||||
t = crud.get_registration_task(db, uuid)
|
||||
if t:
|
||||
new_completed = batch_tasks[batch_id]["completed"] + 1
|
||||
new_success = batch_tasks[batch_id]["success"]
|
||||
new_failed = batch_tasks[batch_id]["failed"]
|
||||
if t.status == "completed":
|
||||
new_success += 1
|
||||
add_batch_log(f"{prefix} [成功] 注册成功")
|
||||
elif t.status == "failed":
|
||||
new_failed += 1
|
||||
add_batch_log(f"{prefix} [失败] 注册失败: {t.error_message}")
|
||||
update_batch_status(completed=new_completed, success=new_success, failed=new_failed)
|
||||
|
||||
try:
|
||||
await asyncio.gather(*[_run_one(i, u) for i, u in enumerate(task_uuids)], return_exceptions=True)
|
||||
if not task_manager.is_batch_cancelled(batch_id):
|
||||
add_batch_log(f"[完成] 批量任务完成!成功: {batch_tasks[batch_id]['success']}, 失败: {batch_tasks[batch_id]['failed']}")
|
||||
update_batch_status(finished=True, status="completed")
|
||||
else:
|
||||
update_batch_status(finished=True, status="cancelled")
|
||||
except Exception as e:
|
||||
logger.error(f"批量任务 {batch_id} 异常: {e}")
|
||||
add_batch_log(f"[错误] 批量任务异常: {str(e)}")
|
||||
update_batch_status(finished=True, status="failed")
|
||||
finally:
|
||||
batch_tasks[batch_id]["finished"] = True
|
||||
|
||||
|
||||
async def run_batch_pipeline(
|
||||
batch_id: str,
|
||||
task_uuids: List[str],
|
||||
email_service_type: str,
|
||||
proxy: Optional[str],
|
||||
email_service_config: Optional[dict],
|
||||
email_service_id: Optional[int],
|
||||
interval_min: int,
|
||||
interval_max: int,
|
||||
concurrency: int
|
||||
):
|
||||
"""
|
||||
流水线模式:每隔 interval 秒启动一个新任务,Semaphore 限制最大并发数
|
||||
"""
|
||||
_init_batch_state(batch_id, task_uuids)
|
||||
add_batch_log, update_batch_status = _make_batch_helpers(batch_id)
|
||||
semaphore = asyncio.Semaphore(concurrency)
|
||||
running_tasks_list = []
|
||||
add_batch_log(f"[系统] 流水线模式启动,并发数: {concurrency},总任务: {len(task_uuids)}")
|
||||
|
||||
async def _run_and_release(idx: int, uuid: str, pfx: str):
|
||||
try:
|
||||
await run_registration_task(
|
||||
uuid, email_service_type, proxy, email_service_config, email_service_id,
|
||||
log_prefix=pfx
|
||||
)
|
||||
with get_db() as db:
|
||||
t = crud.get_registration_task(db, uuid)
|
||||
if t:
|
||||
new_completed = batch_tasks[batch_id]["completed"] + 1
|
||||
new_success = batch_tasks[batch_id]["success"]
|
||||
new_failed = batch_tasks[batch_id]["failed"]
|
||||
if t.status == "completed":
|
||||
new_success += 1
|
||||
add_batch_log(f"{pfx} [成功] 注册成功")
|
||||
elif t.status == "failed":
|
||||
new_failed += 1
|
||||
add_batch_log(f"{pfx} [失败] 注册失败: {t.error_message}")
|
||||
update_batch_status(completed=new_completed, success=new_success, failed=new_failed)
|
||||
finally:
|
||||
semaphore.release()
|
||||
|
||||
try:
|
||||
for i, task_uuid in enumerate(task_uuids):
|
||||
if task_manager.is_batch_cancelled(batch_id) or batch_tasks[batch_id]["cancelled"]:
|
||||
with get_db() as db:
|
||||
for remaining_uuid in task_uuids[i:]:
|
||||
crud.update_registration_task(db, remaining_uuid, status="cancelled")
|
||||
add_batch_log("[取消] 批量任务已取消")
|
||||
update_batch_status(finished=True, status="cancelled")
|
||||
break
|
||||
|
||||
update_batch_status(current_index=i)
|
||||
await semaphore.acquire()
|
||||
prefix = f"[任务{i + 1}]"
|
||||
add_batch_log(f"{prefix} 开始注册...")
|
||||
t = asyncio.create_task(_run_and_release(i, task_uuid, prefix))
|
||||
running_tasks_list.append(t)
|
||||
|
||||
if i < len(task_uuids) - 1 and not task_manager.is_batch_cancelled(batch_id):
|
||||
wait_time = random.randint(interval_min, interval_max)
|
||||
logger.info(f"批量任务 {batch_id}: 等待 {wait_time} 秒后启动下一个任务")
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
if running_tasks_list:
|
||||
await asyncio.gather(*running_tasks_list, return_exceptions=True)
|
||||
|
||||
if not task_manager.is_batch_cancelled(batch_id):
|
||||
add_batch_log(f"[完成] 批量任务完成!成功: {batch_tasks[batch_id]['success']}, 失败: {batch_tasks[batch_id]['failed']}")
|
||||
update_batch_status(finished=True, status="completed")
|
||||
except Exception as e:
|
||||
logger.error(f"批量任务 {batch_id} 异常: {e}")
|
||||
add_batch_log(f"[错误] 批量任务异常: {str(e)}")
|
||||
update_batch_status(finished=True, status="failed")
|
||||
finally:
|
||||
batch_tasks[batch_id]["finished"] = True
|
||||
|
||||
|
||||
async def run_batch_registration(
|
||||
batch_id: str,
|
||||
task_uuids: List[str],
|
||||
@@ -413,95 +580,22 @@ async def run_batch_registration(
|
||||
email_service_config: Optional[dict],
|
||||
email_service_id: Optional[int],
|
||||
interval_min: int,
|
||||
interval_max: int
|
||||
interval_max: int,
|
||||
concurrency: int = 1,
|
||||
mode: str = "pipeline"
|
||||
):
|
||||
"""
|
||||
异步执行批量注册任务
|
||||
|
||||
使用线程池执行每个注册任务,避免阻塞主事件循环
|
||||
"""
|
||||
# 初始化 TaskManager 批量任务(支持 WebSocket 推送)
|
||||
task_manager.init_batch(batch_id, len(task_uuids))
|
||||
|
||||
batch_tasks[batch_id] = {
|
||||
"total": len(task_uuids),
|
||||
"completed": 0,
|
||||
"success": 0,
|
||||
"failed": 0,
|
||||
"cancelled": False,
|
||||
"task_uuids": task_uuids,
|
||||
"current_index": 0
|
||||
}
|
||||
|
||||
def add_batch_log(msg: str):
|
||||
batch_tasks[batch_id]["logs"] = batch_tasks[batch_id].get("logs", [])
|
||||
batch_tasks[batch_id]["logs"].append(msg)
|
||||
task_manager.add_batch_log(batch_id, msg)
|
||||
|
||||
def update_batch_status(**kwargs):
|
||||
for key, value in kwargs.items():
|
||||
if key in batch_tasks[batch_id]:
|
||||
batch_tasks[batch_id][key] = value
|
||||
task_manager.update_batch_status(batch_id, **kwargs)
|
||||
|
||||
try:
|
||||
for i, task_uuid in enumerate(task_uuids):
|
||||
# 检查是否已取消
|
||||
if task_manager.is_batch_cancelled(batch_id) or batch_tasks[batch_id]["cancelled"]:
|
||||
# 取消剩余任务
|
||||
with get_db() as db:
|
||||
for remaining_uuid in task_uuids[i:]:
|
||||
crud.update_registration_task(db, remaining_uuid, status="cancelled")
|
||||
add_batch_log(f"[取消] 批量任务已取消")
|
||||
update_batch_status(finished=True, status="cancelled")
|
||||
logger.info(f"批量任务 {batch_id} 已取消")
|
||||
break
|
||||
|
||||
update_batch_status(current_index=i)
|
||||
|
||||
# 运行单个注册任务(使用线程池)
|
||||
await run_registration_task(
|
||||
task_uuid, email_service_type, proxy, email_service_config, email_service_id
|
||||
)
|
||||
|
||||
# 更新统计
|
||||
with get_db() as db:
|
||||
task = crud.get_registration_task(db, task_uuid)
|
||||
if task:
|
||||
new_completed = batch_tasks[batch_id]["completed"] + 1
|
||||
new_success = batch_tasks[batch_id]["success"]
|
||||
new_failed = batch_tasks[batch_id]["failed"]
|
||||
|
||||
if task.status == "completed":
|
||||
new_success += 1
|
||||
add_batch_log(f"[成功] 第 {new_success} 个账号注册成功")
|
||||
elif task.status == "failed":
|
||||
new_failed += 1
|
||||
add_batch_log(f"[失败] 第 {new_failed} 个账号注册失败: {task.error_message}")
|
||||
|
||||
update_batch_status(
|
||||
completed=new_completed,
|
||||
success=new_success,
|
||||
failed=new_failed
|
||||
)
|
||||
|
||||
# 如果不是最后一个任务,等待随机间隔
|
||||
if i < len(task_uuids) - 1 and not task_manager.is_batch_cancelled(batch_id):
|
||||
wait_time = random.randint(interval_min, interval_max)
|
||||
logger.info(f"批量任务 {batch_id}: 等待 {wait_time} 秒后继续下一个任务")
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
if not task_manager.is_batch_cancelled(batch_id):
|
||||
add_batch_log(f"[完成] 批量任务完成!成功: {batch_tasks[batch_id]['success']}, 失败: {batch_tasks[batch_id]['failed']}")
|
||||
update_batch_status(finished=True, status="completed")
|
||||
logger.info(f"批量任务 {batch_id} 完成: 成功 {batch_tasks[batch_id]['success']}, 失败 {batch_tasks[batch_id]['failed']}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"批量任务 {batch_id} 异常: {e}")
|
||||
add_batch_log(f"[错误] 批量任务异常: {str(e)}")
|
||||
update_batch_status(finished=True, status="failed")
|
||||
finally:
|
||||
batch_tasks[batch_id]["finished"] = True
|
||||
"""根据 mode 分发到并行或流水线执行"""
|
||||
if mode == "parallel":
|
||||
await run_batch_parallel(
|
||||
batch_id, task_uuids, email_service_type, proxy,
|
||||
email_service_config, email_service_id, concurrency
|
||||
)
|
||||
else:
|
||||
await run_batch_pipeline(
|
||||
batch_id, task_uuids, email_service_type, proxy,
|
||||
email_service_config, email_service_id,
|
||||
interval_min, interval_max, concurrency
|
||||
)
|
||||
|
||||
|
||||
# ============== API Endpoints ==============
|
||||
@@ -579,6 +673,12 @@ async def start_batch_registration(
|
||||
if request.interval_min < 0 or request.interval_max < request.interval_min:
|
||||
raise HTTPException(status_code=400, detail="间隔时间参数无效")
|
||||
|
||||
if not 1 <= request.concurrency <= 50:
|
||||
raise HTTPException(status_code=400, detail="并发数必须在 1-50 之间")
|
||||
|
||||
if request.mode not in ("parallel", "pipeline"):
|
||||
raise HTTPException(status_code=400, detail="模式必须为 parallel 或 pipeline")
|
||||
|
||||
# 创建批量任务
|
||||
batch_id = str(uuid.uuid4())
|
||||
task_uuids = []
|
||||
@@ -607,7 +707,9 @@ async def start_batch_registration(
|
||||
request.email_service_config,
|
||||
request.email_service_id,
|
||||
request.interval_min,
|
||||
request.interval_max
|
||||
request.interval_max,
|
||||
request.concurrency,
|
||||
request.mode
|
||||
)
|
||||
|
||||
return BatchRegistrationResponse(
|
||||
@@ -903,168 +1005,52 @@ async def get_outlook_accounts_for_registration():
|
||||
)
|
||||
|
||||
|
||||
def _run_sync_outlook_batch_registration(
|
||||
batch_id: str,
|
||||
service_ids: List[int],
|
||||
skip_registered: bool,
|
||||
proxy: Optional[str],
|
||||
interval_min: int,
|
||||
interval_max: int
|
||||
):
|
||||
"""
|
||||
在线程池中执行的同步 Outlook 批量注册任务
|
||||
"""
|
||||
from ...database.models import EmailService as EmailServiceModel
|
||||
from ...database.models import Account
|
||||
|
||||
# 初始化 TaskManager 批量任务
|
||||
task_manager.init_batch(batch_id, len(service_ids))
|
||||
|
||||
# 兼容旧版 batch_tasks(用于 REST API 轮询降级)
|
||||
batch_tasks[batch_id] = {
|
||||
"total": len(service_ids),
|
||||
"completed": 0,
|
||||
"success": 0,
|
||||
"failed": 0,
|
||||
"skipped": 0,
|
||||
"cancelled": False,
|
||||
"service_ids": service_ids,
|
||||
"current_index": 0,
|
||||
"logs": []
|
||||
}
|
||||
|
||||
def add_batch_log(msg: str):
|
||||
"""同时添加日志到两个系统"""
|
||||
batch_tasks[batch_id]["logs"].append(msg)
|
||||
task_manager.add_batch_log(batch_id, msg)
|
||||
|
||||
def update_batch_status(**kwargs):
|
||||
"""同时更新两个系统的状态"""
|
||||
for key, value in kwargs.items():
|
||||
if key in batch_tasks[batch_id]:
|
||||
batch_tasks[batch_id][key] = value
|
||||
task_manager.update_batch_status(batch_id, **kwargs)
|
||||
|
||||
try:
|
||||
for i, service_id in enumerate(service_ids):
|
||||
# 检查是否已取消
|
||||
if task_manager.is_batch_cancelled(batch_id):
|
||||
add_batch_log(f"[取消] 批量任务已取消")
|
||||
update_batch_status(finished=True, status="cancelled")
|
||||
logger.info(f"Outlook 批量任务 {batch_id} 已取消")
|
||||
break
|
||||
|
||||
update_batch_status(current_index=i)
|
||||
|
||||
with get_db() as db:
|
||||
# 获取邮箱服务
|
||||
service = db.query(EmailServiceModel).filter(
|
||||
EmailServiceModel.id == service_id
|
||||
).first()
|
||||
|
||||
if not service:
|
||||
add_batch_log(f"[跳过] 服务 ID {service_id} 不存在")
|
||||
update_batch_status(skipped=batch_tasks[batch_id]["skipped"] + 1,
|
||||
completed=batch_tasks[batch_id]["completed"] + 1)
|
||||
continue
|
||||
|
||||
config = service.config or {}
|
||||
email = config.get("email") or service.name
|
||||
|
||||
# 检查是否已注册
|
||||
if skip_registered:
|
||||
existing_account = db.query(Account).filter(
|
||||
Account.email == email
|
||||
).first()
|
||||
|
||||
if existing_account:
|
||||
add_batch_log(f"[跳过] {email} 已注册 (账号 ID: {existing_account.id})")
|
||||
update_batch_status(skipped=batch_tasks[batch_id]["skipped"] + 1,
|
||||
completed=batch_tasks[batch_id]["completed"] + 1)
|
||||
continue
|
||||
|
||||
# 创建注册任务
|
||||
task_uuid = str(uuid.uuid4())
|
||||
task = crud.create_registration_task(
|
||||
db,
|
||||
task_uuid=task_uuid,
|
||||
proxy=proxy,
|
||||
email_service_id=service_id
|
||||
)
|
||||
|
||||
add_batch_log(f"[注册] 开始注册 {email}...")
|
||||
|
||||
# 同步执行注册任务
|
||||
_run_sync_registration_task(task_uuid, "outlook", proxy, None, service_id)
|
||||
|
||||
# 更新统计
|
||||
with get_db() as db:
|
||||
task = crud.get_registration_task(db, task_uuid)
|
||||
if task:
|
||||
new_completed = batch_tasks[batch_id]["completed"] + 1
|
||||
new_success = batch_tasks[batch_id]["success"]
|
||||
new_failed = batch_tasks[batch_id]["failed"]
|
||||
|
||||
if task.status == "completed":
|
||||
new_success += 1
|
||||
add_batch_log(f"[成功] {email} 注册成功")
|
||||
elif task.status == "failed":
|
||||
new_failed += 1
|
||||
add_batch_log(f"[失败] {email} 注册失败: {task.error_message}")
|
||||
|
||||
update_batch_status(
|
||||
completed=new_completed,
|
||||
success=new_success,
|
||||
failed=new_failed
|
||||
)
|
||||
|
||||
# 如果不是最后一个任务,等待随机间隔
|
||||
if i < len(service_ids) - 1 and not task_manager.is_batch_cancelled(batch_id):
|
||||
wait_time = random.randint(interval_min, interval_max)
|
||||
logger.info(f"Outlook 批量任务 {batch_id}: 等待 {wait_time} 秒后继续下一个任务")
|
||||
import time
|
||||
time.sleep(wait_time)
|
||||
|
||||
# 完成批量任务
|
||||
if not task_manager.is_batch_cancelled(batch_id):
|
||||
add_batch_log(f"[完成] 批量任务完成!成功: {batch_tasks[batch_id]['success']}, 失败: {batch_tasks[batch_id]['failed']}, 跳过: {batch_tasks[batch_id]['skipped']}")
|
||||
update_batch_status(finished=True, status="completed")
|
||||
logger.info(f"Outlook 批量任务 {batch_id} 完成: 成功 {batch_tasks[batch_id]['success']}, 失败 {batch_tasks[batch_id]['failed']}, 跳过 {batch_tasks[batch_id]['skipped']}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Outlook 批量任务 {batch_id} 异常: {e}")
|
||||
add_batch_log(f"[错误] 批量任务异常: {str(e)}")
|
||||
update_batch_status(finished=True, status="failed")
|
||||
|
||||
|
||||
async def run_outlook_batch_registration(
|
||||
batch_id: str,
|
||||
service_ids: List[int],
|
||||
skip_registered: bool,
|
||||
proxy: Optional[str],
|
||||
interval_min: int,
|
||||
interval_max: int
|
||||
interval_max: int,
|
||||
concurrency: int = 1,
|
||||
mode: str = "pipeline"
|
||||
):
|
||||
"""
|
||||
异步执行 Outlook 批量注册任务
|
||||
异步执行 Outlook 批量注册任务,复用通用并发逻辑
|
||||
|
||||
使用线程池执行,避免阻塞主事件循环
|
||||
将每个 service_id 映射为一个独立的 task_uuid,然后调用
|
||||
run_batch_registration 的并发逻辑
|
||||
"""
|
||||
loop = task_manager.get_loop()
|
||||
if loop is None:
|
||||
loop = asyncio.get_event_loop()
|
||||
task_manager.set_loop(loop)
|
||||
|
||||
# 在线程池中执行
|
||||
await loop.run_in_executor(
|
||||
task_manager.executor,
|
||||
_run_sync_outlook_batch_registration,
|
||||
batch_id,
|
||||
service_ids,
|
||||
skip_registered,
|
||||
proxy,
|
||||
interval_min,
|
||||
interval_max
|
||||
# 预先为每个 service_id 创建注册任务记录
|
||||
task_uuids = []
|
||||
with get_db() as db:
|
||||
for service_id in service_ids:
|
||||
task_uuid = str(uuid.uuid4())
|
||||
crud.create_registration_task(
|
||||
db,
|
||||
task_uuid=task_uuid,
|
||||
proxy=proxy,
|
||||
email_service_id=service_id
|
||||
)
|
||||
task_uuids.append(task_uuid)
|
||||
|
||||
# 复用通用并发逻辑(outlook 服务类型,每个任务通过 email_service_id 定位账户)
|
||||
await run_batch_registration(
|
||||
batch_id=batch_id,
|
||||
task_uuids=task_uuids,
|
||||
email_service_type="outlook",
|
||||
proxy=proxy,
|
||||
email_service_config=None,
|
||||
email_service_id=None, # 每个任务已绑定了独立的 email_service_id
|
||||
interval_min=interval_min,
|
||||
interval_max=interval_max,
|
||||
concurrency=concurrency,
|
||||
mode=mode
|
||||
)
|
||||
|
||||
|
||||
@@ -1092,6 +1078,12 @@ async def start_outlook_batch_registration(
|
||||
if request.interval_min < 0 or request.interval_max < request.interval_min:
|
||||
raise HTTPException(status_code=400, detail="间隔时间参数无效")
|
||||
|
||||
if not 1 <= request.concurrency <= 50:
|
||||
raise HTTPException(status_code=400, detail="并发数必须在 1-50 之间")
|
||||
|
||||
if request.mode not in ("parallel", "pipeline"):
|
||||
raise HTTPException(status_code=400, detail="模式必须为 parallel 或 pipeline")
|
||||
|
||||
# 过滤掉已注册的邮箱
|
||||
actual_service_ids = request.service_ids
|
||||
skipped_count = 0
|
||||
@@ -1154,7 +1146,9 @@ async def start_outlook_batch_registration(
|
||||
request.skip_registered,
|
||||
request.proxy,
|
||||
request.interval_min,
|
||||
request.interval_max
|
||||
request.interval_max,
|
||||
request.concurrency,
|
||||
request.mode
|
||||
)
|
||||
|
||||
return OutlookBatchRegistrationResponse(
|
||||
|
||||
@@ -13,8 +13,8 @@ from datetime import datetime
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 全局线程池
|
||||
_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="reg_worker")
|
||||
# 全局线程池(支持最多 50 个并发注册任务)
|
||||
_executor = ThreadPoolExecutor(max_workers=50, thread_name_prefix="reg_worker")
|
||||
|
||||
# 任务日志队列 (task_uuid -> list of logs)
|
||||
_log_queues: Dict[str, List[str]] = defaultdict(list)
|
||||
@@ -344,10 +344,11 @@ class TaskManager:
|
||||
_ws_sent_index[key].pop(id(websocket), None)
|
||||
logger.info(f"批量任务 WebSocket 连接已注销: {batch_id}")
|
||||
|
||||
def create_log_callback(self, task_uuid: str) -> Callable[[str], None]:
|
||||
"""创建日志回调函数"""
|
||||
def create_log_callback(self, task_uuid: str, prefix: str = "") -> Callable[[str], None]:
|
||||
"""创建日志回调函数,可附加任务编号前缀"""
|
||||
def callback(msg: str):
|
||||
self.add_log(task_uuid, msg)
|
||||
full_msg = f"{prefix} {msg}" if prefix else msg
|
||||
self.add_log(task_uuid, full_msg)
|
||||
return callback
|
||||
|
||||
def create_check_cancelled_callback(self, task_uuid: str) -> Callable[[], bool]:
|
||||
|
||||
@@ -71,7 +71,16 @@ const elements = {
|
||||
outlookAccountsContainer: document.getElementById('outlook-accounts-container'),
|
||||
outlookIntervalMin: document.getElementById('outlook-interval-min'),
|
||||
outlookIntervalMax: document.getElementById('outlook-interval-max'),
|
||||
outlookSkipRegistered: document.getElementById('outlook-skip-registered')
|
||||
outlookSkipRegistered: document.getElementById('outlook-skip-registered'),
|
||||
outlookConcurrencyMode: document.getElementById('outlook-concurrency-mode'),
|
||||
outlookConcurrencyCount: document.getElementById('outlook-concurrency-count'),
|
||||
outlookConcurrencyHint: document.getElementById('outlook-concurrency-hint'),
|
||||
outlookIntervalGroup: document.getElementById('outlook-interval-group'),
|
||||
// 批量并发控件
|
||||
concurrencyMode: document.getElementById('concurrency-mode'),
|
||||
concurrencyCount: document.getElementById('concurrency-count'),
|
||||
concurrencyHint: document.getElementById('concurrency-hint'),
|
||||
intervalGroup: document.getElementById('interval-group')
|
||||
};
|
||||
|
||||
// 初始化
|
||||
@@ -109,6 +118,14 @@ function initEventListeners() {
|
||||
loadRecentAccounts();
|
||||
toast.info('已刷新');
|
||||
});
|
||||
|
||||
// 并发模式切换
|
||||
elements.concurrencyMode.addEventListener('change', () => {
|
||||
handleConcurrencyModeChange(elements.concurrencyMode, elements.concurrencyHint, elements.intervalGroup);
|
||||
});
|
||||
elements.outlookConcurrencyMode.addEventListener('change', () => {
|
||||
handleConcurrencyModeChange(elements.outlookConcurrencyMode, elements.outlookConcurrencyHint, elements.outlookIntervalGroup);
|
||||
});
|
||||
}
|
||||
|
||||
// 加载可用的邮箱服务
|
||||
@@ -261,6 +278,18 @@ function handleModeChange(e) {
|
||||
elements.batchOptions.style.display = isBatchMode ? 'block' : 'none';
|
||||
}
|
||||
|
||||
// 并发模式切换(批量)
|
||||
function handleConcurrencyModeChange(selectEl, hintEl, intervalGroupEl) {
|
||||
const mode = selectEl.value;
|
||||
if (mode === 'parallel') {
|
||||
hintEl.textContent = '所有任务分成 N 个并发批次同时执行';
|
||||
intervalGroupEl.style.display = 'none';
|
||||
} else {
|
||||
hintEl.textContent = '同时最多运行 N 个任务,每隔 interval 秒启动新任务';
|
||||
intervalGroupEl.style.display = 'block';
|
||||
}
|
||||
}
|
||||
|
||||
// 开始注册
|
||||
async function handleStartRegistration(e) {
|
||||
e.preventDefault();
|
||||
@@ -472,10 +501,14 @@ async function handleBatchRegistration(requestData) {
|
||||
const count = parseInt(elements.batchCount.value) || 5;
|
||||
const intervalMin = parseInt(elements.intervalMin.value) || 5;
|
||||
const intervalMax = parseInt(elements.intervalMax.value) || 30;
|
||||
const concurrency = parseInt(elements.concurrencyCount.value) || 3;
|
||||
const mode = elements.concurrencyMode.value || 'pipeline';
|
||||
|
||||
requestData.count = count;
|
||||
requestData.interval_min = intervalMin;
|
||||
requestData.interval_max = intervalMax;
|
||||
requestData.concurrency = Math.min(50, Math.max(1, concurrency));
|
||||
requestData.mode = mode;
|
||||
|
||||
addLog('info', `[系统] 正在启动批量注册任务 (数量: ${count})...`);
|
||||
|
||||
@@ -966,6 +999,8 @@ async function handleOutlookBatchRegistration() {
|
||||
const intervalMin = parseInt(elements.outlookIntervalMin.value) || 5;
|
||||
const intervalMax = parseInt(elements.outlookIntervalMax.value) || 30;
|
||||
const skipRegistered = elements.outlookSkipRegistered.checked;
|
||||
const concurrency = parseInt(elements.outlookConcurrencyCount.value) || 3;
|
||||
const mode = elements.outlookConcurrencyMode.value || 'pipeline';
|
||||
|
||||
// 禁用开始按钮
|
||||
elements.startBtn.disabled = true;
|
||||
@@ -978,7 +1013,9 @@ async function handleOutlookBatchRegistration() {
|
||||
service_ids: selectedIds,
|
||||
skip_registered: skipRegistered,
|
||||
interval_min: intervalMin,
|
||||
interval_max: intervalMax
|
||||
interval_max: intervalMax,
|
||||
concurrency: Math.min(50, Math.max(1, concurrency)),
|
||||
mode: mode
|
||||
};
|
||||
|
||||
addLog('info', `[系统] 正在启动 Outlook 批量注册 (${selectedIds.length} 个账户)...`);
|
||||
|
||||
@@ -150,12 +150,26 @@
|
||||
</div>
|
||||
</div>
|
||||
<div class="form-group">
|
||||
<label for="outlook-interval-min">最小间隔 (秒)</label>
|
||||
<input type="number" id="outlook-interval-min" name="outlook_interval_min" min="0" max="300" value="5">
|
||||
<label for="outlook-concurrency-mode">并发模式</label>
|
||||
<select id="outlook-concurrency-mode" name="outlook_concurrency_mode">
|
||||
<option value="pipeline">流水线(Pipeline)</option>
|
||||
<option value="parallel">并行(Parallel)</option>
|
||||
</select>
|
||||
</div>
|
||||
<div class="form-group">
|
||||
<label for="outlook-interval-max">最大间隔 (秒)</label>
|
||||
<input type="number" id="outlook-interval-max" name="outlook_interval_max" min="1" max="600" value="30">
|
||||
<label for="outlook-concurrency-count">并发数 (1-50)</label>
|
||||
<input type="number" id="outlook-concurrency-count" name="outlook_concurrency_count" min="1" max="50" value="3">
|
||||
<small id="outlook-concurrency-hint" style="color: var(--text-muted); font-size: 0.75rem;">同时最多运行 N 个任务,每隔 interval 秒启动新任务</small>
|
||||
</div>
|
||||
<div id="outlook-interval-group">
|
||||
<div class="form-group">
|
||||
<label for="outlook-interval-min">最小间隔 (秒)</label>
|
||||
<input type="number" id="outlook-interval-min" name="outlook_interval_min" min="0" max="300" value="5">
|
||||
</div>
|
||||
<div class="form-group">
|
||||
<label for="outlook-interval-max">最大间隔 (秒)</label>
|
||||
<input type="number" id="outlook-interval-max" name="outlook_interval_max" min="1" max="600" value="30">
|
||||
</div>
|
||||
</div>
|
||||
<div class="form-group">
|
||||
<label style="display: flex; align-items: center; gap: var(--spacing-sm); cursor: pointer;">
|
||||
@@ -180,13 +194,29 @@
|
||||
|
||||
<div id="batch-options" style="display: none;">
|
||||
<div class="form-group">
|
||||
<label for="interval-min">最小间隔 (秒)</label>
|
||||
<input type="number" id="interval-min" name="interval_min" min="0" max="300" value="5">
|
||||
<label for="concurrency-mode">并发模式</label>
|
||||
<select id="concurrency-mode" name="concurrency_mode">
|
||||
<option value="pipeline">流水线(Pipeline)</option>
|
||||
<option value="parallel">并行(Parallel)</option>
|
||||
</select>
|
||||
</div>
|
||||
|
||||
<div class="form-group">
|
||||
<label for="interval-max">最大间隔 (秒)</label>
|
||||
<input type="number" id="interval-max" name="interval_max" min="1" max="600" value="30">
|
||||
<label for="concurrency-count">并发数 (1-50)</label>
|
||||
<input type="number" id="concurrency-count" name="concurrency_count" min="1" max="50" value="3">
|
||||
<small id="concurrency-hint" style="color: var(--text-muted); font-size: 0.75rem;">同时最多运行 N 个任务,每隔 interval 秒启动新任务</small>
|
||||
</div>
|
||||
|
||||
<div id="interval-group">
|
||||
<div class="form-group">
|
||||
<label for="interval-min">最小间隔 (秒)</label>
|
||||
<input type="number" id="interval-min" name="interval_min" min="0" max="300" value="5">
|
||||
</div>
|
||||
|
||||
<div class="form-group">
|
||||
<label for="interval-max">最大间隔 (秒)</label>
|
||||
<input type="number" id="interval-max" name="interval_max" min="1" max="600" value="30">
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
|
||||
Reference in New Issue
Block a user