mirror of
https://github.com/cnlimiter/codex-register.git
synced 2026-05-13 17:29:43 +08:00
feat(cpa): 支持多cpa服务
This commit is contained in:
@@ -39,33 +39,48 @@ def generate_token_json(account: Account) -> dict:
|
||||
}
|
||||
|
||||
|
||||
def upload_to_cpa(token_data: dict, proxy: str = None) -> Tuple[bool, str]:
|
||||
def upload_to_cpa(
|
||||
token_data: dict,
|
||||
proxy: str = None,
|
||||
api_url: str = None,
|
||||
api_token: str = None,
|
||||
) -> Tuple[bool, str]:
|
||||
"""
|
||||
上传单个账号到 CPA 管理平台(不走代理)
|
||||
|
||||
Args:
|
||||
token_data: Token JSON 数据
|
||||
proxy: 保留参数,不使用(CPA 上传始终直连)
|
||||
api_url: 指定 CPA API URL(优先于全局配置)
|
||||
api_token: 指定 CPA API Token(优先于全局配置)
|
||||
|
||||
Returns:
|
||||
(成功标志, 消息或错误信息)
|
||||
"""
|
||||
settings = get_settings()
|
||||
|
||||
if not settings.cpa_enabled:
|
||||
# 优先使用传入的参数,否则退回全局配置
|
||||
effective_url = api_url or settings.cpa_api_url
|
||||
effective_token = api_token or (settings.cpa_api_token.get_secret_value() if settings.cpa_api_token else "")
|
||||
|
||||
# 仅当未指定服务时才检查全局启用开关
|
||||
if not api_url and not settings.cpa_enabled:
|
||||
return False, "CPA 上传未启用"
|
||||
|
||||
if not settings.cpa_api_url:
|
||||
if not effective_url:
|
||||
return False, "CPA API URL 未配置"
|
||||
|
||||
api_url = settings.cpa_api_url.rstrip("/")
|
||||
if not effective_token:
|
||||
return False, "CPA API Token 未配置"
|
||||
|
||||
api_url = effective_url.rstrip("/")
|
||||
upload_url = f"{api_url}/v0/management/auth-files"
|
||||
|
||||
filename = f"{token_data['email']}.json"
|
||||
file_content = json.dumps(token_data, ensure_ascii=False, indent=2).encode("utf-8")
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {settings.cpa_api_token.get_secret_value()}",
|
||||
"Authorization": f"Bearer {effective_token}",
|
||||
}
|
||||
|
||||
try:
|
||||
@@ -103,13 +118,20 @@ def upload_to_cpa(token_data: dict, proxy: str = None) -> Tuple[bool, str]:
|
||||
return False, f"上传异常: {str(e)}"
|
||||
|
||||
|
||||
def batch_upload_to_cpa(account_ids: List[int], proxy: str = None) -> dict:
|
||||
def batch_upload_to_cpa(
|
||||
account_ids: List[int],
|
||||
proxy: str = None,
|
||||
api_url: str = None,
|
||||
api_token: str = None,
|
||||
) -> dict:
|
||||
"""
|
||||
批量上传账号到 CPA 管理平台
|
||||
|
||||
Args:
|
||||
account_ids: 账号 ID 列表
|
||||
proxy: 可选的代理 URL
|
||||
api_url: 指定 CPA API URL(优先于全局配置)
|
||||
api_token: 指定 CPA API Token(优先于全局配置)
|
||||
|
||||
Returns:
|
||||
包含成功/失败统计和详情的字典
|
||||
@@ -150,7 +172,7 @@ def batch_upload_to_cpa(account_ids: List[int], proxy: str = None) -> dict:
|
||||
token_data = generate_token_json(account)
|
||||
|
||||
# 上传
|
||||
success, message = upload_to_cpa(token_data, proxy)
|
||||
success, message = upload_to_cpa(token_data, proxy, api_url=api_url, api_token=api_token)
|
||||
|
||||
if success:
|
||||
# 更新数据库状态
|
||||
|
||||
@@ -7,7 +7,7 @@ from datetime import datetime, timedelta
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import and_, or_, desc, asc, func
|
||||
|
||||
from .models import Account, EmailService, RegistrationTask, Setting, Proxy
|
||||
from .models import Account, EmailService, RegistrationTask, Setting, Proxy, CpaService
|
||||
|
||||
|
||||
# ============================================================================
|
||||
@@ -497,4 +497,73 @@ def get_proxies_count(db: Session, enabled: Optional[bool] = None) -> int:
|
||||
query = db.query(func.count(Proxy.id))
|
||||
if enabled is not None:
|
||||
query = query.filter(Proxy.enabled == enabled)
|
||||
return query.scalar()
|
||||
return query.scalar()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# CPA 服务 CRUD
|
||||
# ============================================================================
|
||||
|
||||
def create_cpa_service(
|
||||
db: Session,
|
||||
name: str,
|
||||
api_url: str,
|
||||
api_token: str,
|
||||
enabled: bool = True,
|
||||
priority: int = 0
|
||||
) -> CpaService:
|
||||
"""创建 CPA 服务配置"""
|
||||
db_service = CpaService(
|
||||
name=name,
|
||||
api_url=api_url,
|
||||
api_token=api_token,
|
||||
enabled=enabled,
|
||||
priority=priority
|
||||
)
|
||||
db.add(db_service)
|
||||
db.commit()
|
||||
db.refresh(db_service)
|
||||
return db_service
|
||||
|
||||
|
||||
def get_cpa_service_by_id(db: Session, service_id: int) -> Optional[CpaService]:
|
||||
"""根据 ID 获取 CPA 服务"""
|
||||
return db.query(CpaService).filter(CpaService.id == service_id).first()
|
||||
|
||||
|
||||
def get_cpa_services(
|
||||
db: Session,
|
||||
enabled: Optional[bool] = None
|
||||
) -> List[CpaService]:
|
||||
"""获取 CPA 服务列表"""
|
||||
query = db.query(CpaService)
|
||||
if enabled is not None:
|
||||
query = query.filter(CpaService.enabled == enabled)
|
||||
return query.order_by(asc(CpaService.priority), asc(CpaService.id)).all()
|
||||
|
||||
|
||||
def update_cpa_service(
|
||||
db: Session,
|
||||
service_id: int,
|
||||
**kwargs
|
||||
) -> Optional[CpaService]:
|
||||
"""更新 CPA 服务配置"""
|
||||
db_service = get_cpa_service_by_id(db, service_id)
|
||||
if not db_service:
|
||||
return None
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(db_service, key):
|
||||
setattr(db_service, key, value)
|
||||
db.commit()
|
||||
db.refresh(db_service)
|
||||
return db_service
|
||||
|
||||
|
||||
def delete_cpa_service(db: Session, service_id: int) -> bool:
|
||||
"""删除 CPA 服务配置"""
|
||||
db_service = get_cpa_service_by_id(db, service_id)
|
||||
if not db_service:
|
||||
return False
|
||||
db.delete(db_service)
|
||||
db.commit()
|
||||
return True
|
||||
@@ -130,6 +130,20 @@ class Setting(Base):
|
||||
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
||||
|
||||
|
||||
class CpaService(Base):
|
||||
"""CPA 服务配置表"""
|
||||
__tablename__ = 'cpa_services'
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
name = Column(String(100), nullable=False) # 服务名称
|
||||
api_url = Column(String(500), nullable=False) # API URL
|
||||
api_token = Column(Text, nullable=False) # API Token
|
||||
enabled = Column(Boolean, default=True)
|
||||
priority = Column(Integer, default=0) # 优先级
|
||||
created_at = Column(DateTime, default=datetime.utcnow)
|
||||
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
|
||||
|
||||
|
||||
class Proxy(Base):
|
||||
"""代理列表表"""
|
||||
__tablename__ = 'proxies'
|
||||
|
||||
@@ -112,6 +112,9 @@ class DatabaseSessionManager:
|
||||
("accounts", "cookies", "TEXT"),
|
||||
]
|
||||
|
||||
# 确保新表存在(create_tables 已处理,此处兜底)
|
||||
Base.metadata.create_all(bind=self.engine)
|
||||
|
||||
with self.engine.connect() as conn:
|
||||
for table_name, column_name, column_type in migrations:
|
||||
try:
|
||||
|
||||
@@ -9,6 +9,7 @@ from .registration import router as registration_router
|
||||
from .settings import router as settings_router
|
||||
from .email_services import router as email_services_router
|
||||
from .payment import router as payment_router
|
||||
from .cpa_services import router as cpa_services_router
|
||||
|
||||
api_router = APIRouter()
|
||||
|
||||
@@ -18,3 +19,4 @@ api_router.include_router(registration_router, prefix="/registration", tags=["re
|
||||
api_router.include_router(settings_router, prefix="/settings", tags=["settings"])
|
||||
api_router.include_router(email_services_router, prefix="/email-services", tags=["email-services"])
|
||||
api_router.include_router(payment_router, prefix="/payment", tags=["payment"])
|
||||
api_router.include_router(cpa_services_router, prefix="/cpa-services", tags=["cpa-services"])
|
||||
|
||||
@@ -700,6 +700,7 @@ async def batch_validate_tokens(request: BatchValidateRequest):
|
||||
class CPAUploadRequest(BaseModel):
|
||||
"""CPA 上传请求"""
|
||||
proxy: Optional[str] = None
|
||||
cpa_service_id: Optional[int] = None # 指定 CPA 服务 ID,不传则使用全局配置
|
||||
|
||||
|
||||
class BatchCPAUploadRequest(BaseModel):
|
||||
@@ -710,6 +711,7 @@ class BatchCPAUploadRequest(BaseModel):
|
||||
status_filter: Optional[str] = None
|
||||
email_service_filter: Optional[str] = None
|
||||
search_filter: Optional[str] = None
|
||||
cpa_service_id: Optional[int] = None # 指定 CPA 服务 ID,不传则使用全局配置
|
||||
|
||||
|
||||
@router.post("/{account_id}/upload-cpa")
|
||||
@@ -717,8 +719,19 @@ async def upload_account_to_cpa(account_id: int, request: CPAUploadRequest = Non
|
||||
"""上传单个账号到 CPA"""
|
||||
from ...core.cpa_upload import upload_to_cpa, generate_token_json
|
||||
|
||||
# 使用传入的代理或全局代理配置
|
||||
proxy = request.proxy if request and request.proxy else get_settings().proxy_url
|
||||
cpa_service_id = request.cpa_service_id if request else None
|
||||
|
||||
# 解析指定的 CPA 服务
|
||||
cpa_api_url = None
|
||||
cpa_api_token = None
|
||||
if cpa_service_id:
|
||||
with get_db() as db:
|
||||
svc = crud.get_cpa_service_by_id(db, cpa_service_id)
|
||||
if not svc:
|
||||
raise HTTPException(status_code=404, detail="指定的 CPA 服务不存在")
|
||||
cpa_api_url = svc.api_url
|
||||
cpa_api_token = svc.api_token
|
||||
|
||||
with get_db() as db:
|
||||
account = crud.get_account_by_id(db, account_id)
|
||||
@@ -735,23 +748,15 @@ async def upload_account_to_cpa(account_id: int, request: CPAUploadRequest = Non
|
||||
token_data = generate_token_json(account)
|
||||
|
||||
# 上传
|
||||
success, message = upload_to_cpa(token_data, proxy)
|
||||
success, message = upload_to_cpa(token_data, proxy, api_url=cpa_api_url, api_token=cpa_api_token)
|
||||
|
||||
if success:
|
||||
# 更新数据库状态
|
||||
account.cpa_uploaded = True
|
||||
account.cpa_uploaded_at = datetime.utcnow()
|
||||
db.commit()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": message
|
||||
}
|
||||
return {"success": True, "message": message}
|
||||
else:
|
||||
return {
|
||||
"success": False,
|
||||
"error": message
|
||||
}
|
||||
return {"success": False, "error": message}
|
||||
|
||||
|
||||
@router.post("/batch-upload-cpa")
|
||||
@@ -759,15 +764,24 @@ async def batch_upload_accounts_to_cpa(request: BatchCPAUploadRequest):
|
||||
"""批量上传账号到 CPA"""
|
||||
from ...core.cpa_upload import batch_upload_to_cpa
|
||||
|
||||
# 使用传入的代理或全局代理配置
|
||||
proxy = request.proxy if request.proxy else get_settings().proxy_url
|
||||
|
||||
# 解析指定的 CPA 服务
|
||||
cpa_api_url = None
|
||||
cpa_api_token = None
|
||||
if request.cpa_service_id:
|
||||
with get_db() as db:
|
||||
svc = crud.get_cpa_service_by_id(db, request.cpa_service_id)
|
||||
if not svc:
|
||||
raise HTTPException(status_code=404, detail="指定的 CPA 服务不存在")
|
||||
cpa_api_url = svc.api_url
|
||||
cpa_api_token = svc.api_token
|
||||
|
||||
with get_db() as db:
|
||||
ids = resolve_account_ids(
|
||||
db, request.ids, request.select_all,
|
||||
request.status_filter, request.email_service_filter, request.search_filter
|
||||
)
|
||||
|
||||
results = batch_upload_to_cpa(ids, proxy)
|
||||
|
||||
results = batch_upload_to_cpa(ids, proxy, api_url=cpa_api_url, api_token=cpa_api_token)
|
||||
return results
|
||||
|
||||
171
src/web/routes/cpa_services.py
Normal file
171
src/web/routes/cpa_services.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""
|
||||
CPA 服务管理 API 路由
|
||||
"""
|
||||
|
||||
from typing import List, Optional
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ...database import crud
|
||||
from ...database.session import get_db
|
||||
from ...core.cpa_upload import test_cpa_connection
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
# ============== Pydantic Models ==============
|
||||
|
||||
class CpaServiceCreate(BaseModel):
|
||||
name: str
|
||||
api_url: str
|
||||
api_token: str
|
||||
enabled: bool = True
|
||||
priority: int = 0
|
||||
|
||||
|
||||
class CpaServiceUpdate(BaseModel):
|
||||
name: Optional[str] = None
|
||||
api_url: Optional[str] = None
|
||||
api_token: Optional[str] = None
|
||||
enabled: Optional[bool] = None
|
||||
priority: Optional[int] = None
|
||||
|
||||
|
||||
class CpaServiceResponse(BaseModel):
|
||||
id: int
|
||||
name: str
|
||||
api_url: str
|
||||
has_token: bool
|
||||
enabled: bool
|
||||
priority: int
|
||||
created_at: Optional[str] = None
|
||||
updated_at: Optional[str] = None
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class CpaServiceTestRequest(BaseModel):
|
||||
api_url: Optional[str] = None
|
||||
api_token: Optional[str] = None
|
||||
|
||||
|
||||
def _to_response(svc) -> CpaServiceResponse:
|
||||
return CpaServiceResponse(
|
||||
id=svc.id,
|
||||
name=svc.name,
|
||||
api_url=svc.api_url,
|
||||
has_token=bool(svc.api_token),
|
||||
enabled=svc.enabled,
|
||||
priority=svc.priority,
|
||||
created_at=svc.created_at.isoformat() if svc.created_at else None,
|
||||
updated_at=svc.updated_at.isoformat() if svc.updated_at else None,
|
||||
)
|
||||
|
||||
|
||||
# ============== API Endpoints ==============
|
||||
|
||||
@router.get("", response_model=List[CpaServiceResponse])
|
||||
async def list_cpa_services(enabled: Optional[bool] = None):
|
||||
"""获取 CPA 服务列表"""
|
||||
with get_db() as db:
|
||||
services = crud.get_cpa_services(db, enabled=enabled)
|
||||
return [_to_response(s) for s in services]
|
||||
|
||||
|
||||
@router.post("", response_model=CpaServiceResponse)
|
||||
async def create_cpa_service(request: CpaServiceCreate):
|
||||
"""新增 CPA 服务"""
|
||||
with get_db() as db:
|
||||
service = crud.create_cpa_service(
|
||||
db,
|
||||
name=request.name,
|
||||
api_url=request.api_url,
|
||||
api_token=request.api_token,
|
||||
enabled=request.enabled,
|
||||
priority=request.priority,
|
||||
)
|
||||
return _to_response(service)
|
||||
|
||||
|
||||
@router.get("/{service_id}", response_model=CpaServiceResponse)
|
||||
async def get_cpa_service(service_id: int):
|
||||
"""获取单个 CPA 服务详情"""
|
||||
with get_db() as db:
|
||||
service = crud.get_cpa_service_by_id(db, service_id)
|
||||
if not service:
|
||||
raise HTTPException(status_code=404, detail="CPA 服务不存在")
|
||||
return _to_response(service)
|
||||
|
||||
|
||||
@router.get("/{service_id}/full")
|
||||
async def get_cpa_service_full(service_id: int):
|
||||
"""获取 CPA 服务完整配置(含 token)"""
|
||||
with get_db() as db:
|
||||
service = crud.get_cpa_service_by_id(db, service_id)
|
||||
if not service:
|
||||
raise HTTPException(status_code=404, detail="CPA 服务不存在")
|
||||
return {
|
||||
"id": service.id,
|
||||
"name": service.name,
|
||||
"api_url": service.api_url,
|
||||
"api_token": service.api_token,
|
||||
"enabled": service.enabled,
|
||||
"priority": service.priority,
|
||||
}
|
||||
|
||||
|
||||
@router.patch("/{service_id}", response_model=CpaServiceResponse)
|
||||
async def update_cpa_service(service_id: int, request: CpaServiceUpdate):
|
||||
"""更新 CPA 服务配置"""
|
||||
with get_db() as db:
|
||||
service = crud.get_cpa_service_by_id(db, service_id)
|
||||
if not service:
|
||||
raise HTTPException(status_code=404, detail="CPA 服务不存在")
|
||||
|
||||
update_data = {}
|
||||
if request.name is not None:
|
||||
update_data["name"] = request.name
|
||||
if request.api_url is not None:
|
||||
update_data["api_url"] = request.api_url
|
||||
# api_token 留空则保持原值
|
||||
if request.api_token:
|
||||
update_data["api_token"] = request.api_token
|
||||
if request.enabled is not None:
|
||||
update_data["enabled"] = request.enabled
|
||||
if request.priority is not None:
|
||||
update_data["priority"] = request.priority
|
||||
|
||||
service = crud.update_cpa_service(db, service_id, **update_data)
|
||||
return _to_response(service)
|
||||
|
||||
|
||||
@router.delete("/{service_id}")
|
||||
async def delete_cpa_service(service_id: int):
|
||||
"""删除 CPA 服务"""
|
||||
with get_db() as db:
|
||||
service = crud.get_cpa_service_by_id(db, service_id)
|
||||
if not service:
|
||||
raise HTTPException(status_code=404, detail="CPA 服务不存在")
|
||||
crud.delete_cpa_service(db, service_id)
|
||||
return {"success": True, "message": f"CPA 服务 {service.name} 已删除"}
|
||||
|
||||
|
||||
@router.post("/{service_id}/test")
|
||||
async def test_cpa_service(service_id: int):
|
||||
"""测试 CPA 服务连接"""
|
||||
with get_db() as db:
|
||||
service = crud.get_cpa_service_by_id(db, service_id)
|
||||
if not service:
|
||||
raise HTTPException(status_code=404, detail="CPA 服务不存在")
|
||||
success, message = test_cpa_connection(service.api_url, service.api_token)
|
||||
return {"success": success, "message": message}
|
||||
|
||||
|
||||
@router.post("/test-connection")
|
||||
async def test_cpa_connection_direct(request: CpaServiceTestRequest):
|
||||
"""直接测试 CPA 连接(用于添加前验证)"""
|
||||
if not request.api_url or not request.api_token:
|
||||
raise HTTPException(status_code=400, detail="api_url 和 api_token 不能为空")
|
||||
success, message = test_cpa_connection(request.api_url, request.api_token)
|
||||
return {"success": success, "message": message}
|
||||
@@ -72,6 +72,7 @@ class RegistrationTaskCreate(BaseModel):
|
||||
email_service_config: Optional[dict] = None
|
||||
email_service_id: Optional[int] = None # 使用数据库中已配置的邮箱服务 ID
|
||||
auto_upload_cpa: bool = False # 注册成功后自动上传到 CPA
|
||||
cpa_service_id: Optional[int] = None # 指定 CPA 服务 ID,不传则使用全局配置
|
||||
|
||||
|
||||
class BatchRegistrationRequest(BaseModel):
|
||||
@@ -86,6 +87,7 @@ class BatchRegistrationRequest(BaseModel):
|
||||
concurrency: int = 1 # 并发线程数 (1-50)
|
||||
mode: str = "pipeline" # 执行模式: "parallel" 或 "pipeline"
|
||||
auto_upload_cpa: bool = False # 注册成功后自动上传到 CPA
|
||||
cpa_service_id: Optional[int] = None # 指定 CPA 服务 ID,不传则使用全局配置
|
||||
|
||||
|
||||
class RegistrationTaskResponse(BaseModel):
|
||||
@@ -149,6 +151,7 @@ class OutlookBatchRegistrationRequest(BaseModel):
|
||||
concurrency: int = 1 # 并发线程数 (1-50)
|
||||
mode: str = "pipeline" # 执行模式: "parallel" 或 "pipeline"
|
||||
auto_upload_cpa: bool = False # 注册成功后自动上传到 CPA
|
||||
cpa_service_id: Optional[int] = None # 指定 CPA 服务 ID,不传则使用全局配置
|
||||
|
||||
|
||||
class OutlookBatchRegistrationResponse(BaseModel):
|
||||
@@ -179,7 +182,7 @@ def task_to_response(task: RegistrationTask) -> RegistrationTaskResponse:
|
||||
)
|
||||
|
||||
|
||||
def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = "", batch_id: str = "", auto_upload_cpa: bool = False):
|
||||
def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = "", batch_id: str = "", auto_upload_cpa: bool = False, cpa_service_id: Optional[int] = None):
|
||||
"""
|
||||
在线程池中执行的同步注册任务
|
||||
|
||||
@@ -344,7 +347,19 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
|
||||
saved_account = db.query(AccountModel).filter_by(email=result.email).first()
|
||||
if saved_account and saved_account.access_token:
|
||||
token_data = generate_token_json(saved_account)
|
||||
cpa_success, cpa_msg = upload_to_cpa(token_data)
|
||||
# 解析指定 CPA 服务
|
||||
_cpa_api_url = None
|
||||
_cpa_api_token = None
|
||||
if cpa_service_id:
|
||||
try:
|
||||
_svc = crud.get_cpa_service_by_id(db, cpa_service_id)
|
||||
if _svc:
|
||||
_cpa_api_url = _svc.api_url
|
||||
_cpa_api_token = _svc.api_token
|
||||
log_callback(f"[CPA] 使用服务: {_svc.name}")
|
||||
except Exception:
|
||||
pass
|
||||
cpa_success, cpa_msg = upload_to_cpa(token_data, api_url=_cpa_api_url, api_token=_cpa_api_token)
|
||||
if cpa_success:
|
||||
saved_account.cpa_uploaded = True
|
||||
saved_account.cpa_uploaded_at = datetime.utcnow()
|
||||
@@ -399,7 +414,7 @@ def _run_sync_registration_task(task_uuid: str, email_service_type: str, proxy:
|
||||
pass
|
||||
|
||||
|
||||
async def run_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = "", batch_id: str = "", auto_upload_cpa: bool = False):
|
||||
async def run_registration_task(task_uuid: str, email_service_type: str, proxy: Optional[str], email_service_config: Optional[dict], email_service_id: Optional[int] = None, log_prefix: str = "", batch_id: str = "", auto_upload_cpa: bool = False, cpa_service_id: Optional[int] = None):
|
||||
"""
|
||||
异步执行注册任务
|
||||
|
||||
@@ -426,7 +441,8 @@ async def run_registration_task(task_uuid: str, email_service_type: str, proxy:
|
||||
email_service_id,
|
||||
log_prefix,
|
||||
batch_id,
|
||||
auto_upload_cpa
|
||||
auto_upload_cpa,
|
||||
cpa_service_id
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"线程池执行异常: {task_uuid}, 错误: {e}")
|
||||
@@ -473,7 +489,8 @@ async def run_batch_parallel(
|
||||
email_service_config: Optional[dict],
|
||||
email_service_id: Optional[int],
|
||||
concurrency: int,
|
||||
auto_upload_cpa: bool = False
|
||||
auto_upload_cpa: bool = False,
|
||||
cpa_service_id: Optional[int] = None
|
||||
):
|
||||
"""
|
||||
并行模式:所有任务同时提交,Semaphore 控制最大并发数
|
||||
@@ -489,7 +506,8 @@ async def run_batch_parallel(
|
||||
async with semaphore:
|
||||
await run_registration_task(
|
||||
uuid, email_service_type, proxy, email_service_config, email_service_id,
|
||||
log_prefix=prefix, batch_id=batch_id, auto_upload_cpa=auto_upload_cpa
|
||||
log_prefix=prefix, batch_id=batch_id, auto_upload_cpa=auto_upload_cpa,
|
||||
cpa_service_id=cpa_service_id
|
||||
)
|
||||
with get_db() as db:
|
||||
t = crud.get_registration_task(db, uuid)
|
||||
@@ -531,7 +549,8 @@ async def run_batch_pipeline(
|
||||
interval_min: int,
|
||||
interval_max: int,
|
||||
concurrency: int,
|
||||
auto_upload_cpa: bool = False
|
||||
auto_upload_cpa: bool = False,
|
||||
cpa_service_id: Optional[int] = None
|
||||
):
|
||||
"""
|
||||
流水线模式:每隔 interval 秒启动一个新任务,Semaphore 限制最大并发数
|
||||
@@ -547,7 +566,8 @@ async def run_batch_pipeline(
|
||||
try:
|
||||
await run_registration_task(
|
||||
uuid, email_service_type, proxy, email_service_config, email_service_id,
|
||||
log_prefix=pfx, batch_id=batch_id, auto_upload_cpa=auto_upload_cpa
|
||||
log_prefix=pfx, batch_id=batch_id, auto_upload_cpa=auto_upload_cpa,
|
||||
cpa_service_id=cpa_service_id
|
||||
)
|
||||
with get_db() as db:
|
||||
t = crud.get_registration_task(db, uuid)
|
||||
@@ -613,21 +633,22 @@ async def run_batch_registration(
|
||||
interval_max: int,
|
||||
concurrency: int = 1,
|
||||
mode: str = "pipeline",
|
||||
auto_upload_cpa: bool = False
|
||||
auto_upload_cpa: bool = False,
|
||||
cpa_service_id: Optional[int] = None
|
||||
):
|
||||
"""根据 mode 分发到并行或流水线执行"""
|
||||
if mode == "parallel":
|
||||
await run_batch_parallel(
|
||||
batch_id, task_uuids, email_service_type, proxy,
|
||||
email_service_config, email_service_id, concurrency,
|
||||
auto_upload_cpa=auto_upload_cpa
|
||||
auto_upload_cpa=auto_upload_cpa, cpa_service_id=cpa_service_id
|
||||
)
|
||||
else:
|
||||
await run_batch_pipeline(
|
||||
batch_id, task_uuids, email_service_type, proxy,
|
||||
email_service_config, email_service_id,
|
||||
interval_min, interval_max, concurrency,
|
||||
auto_upload_cpa=auto_upload_cpa
|
||||
auto_upload_cpa=auto_upload_cpa, cpa_service_id=cpa_service_id
|
||||
)
|
||||
|
||||
|
||||
@@ -674,7 +695,8 @@ async def start_registration(
|
||||
request.email_service_id,
|
||||
"",
|
||||
"",
|
||||
request.auto_upload_cpa
|
||||
request.auto_upload_cpa,
|
||||
request.cpa_service_id
|
||||
)
|
||||
|
||||
return task_to_response(task)
|
||||
@@ -746,7 +768,8 @@ async def start_batch_registration(
|
||||
request.interval_max,
|
||||
request.concurrency,
|
||||
request.mode,
|
||||
request.auto_upload_cpa
|
||||
request.auto_upload_cpa,
|
||||
request.cpa_service_id
|
||||
)
|
||||
|
||||
return BatchRegistrationResponse(
|
||||
@@ -1075,7 +1098,8 @@ async def run_outlook_batch_registration(
|
||||
interval_max: int,
|
||||
concurrency: int = 1,
|
||||
mode: str = "pipeline",
|
||||
auto_upload_cpa: bool = False
|
||||
auto_upload_cpa: bool = False,
|
||||
cpa_service_id: Optional[int] = None
|
||||
):
|
||||
"""
|
||||
异步执行 Outlook 批量注册任务,复用通用并发逻辑
|
||||
@@ -1113,7 +1137,8 @@ async def run_outlook_batch_registration(
|
||||
interval_max=interval_max,
|
||||
concurrency=concurrency,
|
||||
mode=mode,
|
||||
auto_upload_cpa=auto_upload_cpa
|
||||
auto_upload_cpa=auto_upload_cpa,
|
||||
cpa_service_id=cpa_service_id
|
||||
)
|
||||
|
||||
|
||||
@@ -1212,7 +1237,8 @@ async def start_outlook_batch_registration(
|
||||
request.interval_max,
|
||||
request.concurrency,
|
||||
request.mode,
|
||||
request.auto_upload_cpa
|
||||
request.auto_upload_cpa,
|
||||
request.cpa_service_id
|
||||
)
|
||||
|
||||
return OutlookBatchRegistrationResponse(
|
||||
|
||||
Reference in New Issue
Block a user