diff --git a/DouYinSparkFlow/config.json b/DouYinSparkFlow/config.json index 0d0a0a2..48770ef 100644 --- a/DouYinSparkFlow/config.json +++ b/DouYinSparkFlow/config.json @@ -3,6 +3,9 @@ "taskCount": 1, "proxyAddress": "", "messageTemplate": "🤩今日火花+1\r\n", + "saveDebugArtifacts": false, + "useProtocolSender": false, + "protocolDryRun": false, "browserSenderAccounts": [ "94262577168", "抖音号:softwomen" @@ -36,4 +39,4 @@ "enabled": true, "messageTemplate": "\r\n" } -} +} \ No newline at end of file diff --git a/DouYinSparkFlow/core/tasks.py b/DouYinSparkFlow/core/tasks.py index 379c550..a4639c7 100644 --- a/DouYinSparkFlow/core/tasks.py +++ b/DouYinSparkFlow/core/tasks.py @@ -48,6 +48,10 @@ def _normalize_target_name(value): def _current_run_mode(): + if _manual_run_unsent_only(): + return "manual_unsent_only" + if _manual_run_failed_only(): + return "manual_failed_only" return "manual" if _is_manual_run() else "scheduled" @@ -196,8 +200,26 @@ async def scroll_and_select_user(page, account_name, targets): } found_usernames = set() remaining_targets = set(normalized_targets) + scan_started_at = asyncio.get_running_loop().time() + last_new_friend_at = scan_started_at + max_scan_seconds = 300 + idle_scan_seconds = 120 + + def missing_target_names(): + return sorted(normalized_targets[item] for item in remaining_targets) while True: + now_monotonic = asyncio.get_running_loop().time() + if now_monotonic - scan_started_at > max_scan_seconds: + logger.warning( + "Account %s friend list scan timed out after %ss. Missing targets: %s; scannedFriends=%s", + account_name, + max_scan_seconds, + missing_target_names(), + len(found_usernames), + ) + return + target_elements = await page.locator(target_selector).all() for element in target_elements: @@ -214,6 +236,7 @@ async def scroll_and_select_user(page, account_name, targets): if normalized_target_name in found_usernames: continue found_usernames.add(normalized_target_name) + last_new_friend_at = asyncio.get_running_loop().time() logger.debug("Account %s found friend entry %s", account_name, target_name) matched_target_name = normalized_targets.get(normalized_target_name) @@ -239,7 +262,18 @@ async def scroll_and_select_user(page, account_name, targets): logger.warning( "Account %s reached the end of the friend list. Missing targets: %s", account_name, - sorted(normalized_targets[item] for item in remaining_targets), + missing_target_names(), + ) + return + + now_monotonic = asyncio.get_running_loop().time() + if found_usernames and now_monotonic - last_new_friend_at > idle_scan_seconds: + logger.warning( + "Account %s friend list scan made no progress for %ss. Missing targets: %s; scannedFriends=%s", + account_name, + idle_scan_seconds, + missing_target_names(), + len(found_usernames), ) return @@ -317,6 +351,19 @@ def _manual_run_failed_only(): return _is_manual_run() and os.getenv("SPARKFLOW_MANUAL_FAILED_ONLY") == "1" +def _manual_run_unsent_only(): + return _is_manual_run() and os.getenv("SPARKFLOW_MANUAL_UNSENT_ONLY") == "1" + + +def _unsent_retry_max_attempts(): + raw_value = str(os.getenv("SPARKFLOW_UNSENT_RETRY_MAX_ATTEMPTS") or "3").strip() + try: + return max(1, int(raw_value)) + except ValueError: + logger.warning("Invalid SPARKFLOW_UNSENT_RETRY_MAX_ATTEMPTS=%r, using 3", raw_value) + return 3 + + def _target_sent_today(user, target_name, now): history = dict(user.get("message_history") or {}) entry = history.get(target_name) or {} @@ -331,6 +378,18 @@ def _target_failed_today(user, target_name, now): return bool(last_attempt_at and last_attempt_at.date() == now.date()) +def _target_failure_attempts_today(user, target_name, now): + queue = dict(user.get("failure_queue") or {}) + entry = queue.get(target_name) or {} + last_attempt_at = _parse_sent_at(entry.get("lastAttemptAt"), now.tzinfo) + if not last_attempt_at or last_attempt_at.date() != now.date(): + return 0 + try: + return int(entry.get("attemptCount") or 0) + except (TypeError, ValueError): + return 0 + + def _pending_failed_targets(user, now): queue = dict(user.get("failure_queue") or {}) targets = [] @@ -342,6 +401,21 @@ def _pending_failed_targets(user, now): return targets +def _pending_unsent_targets(user, now): + retry_targets = [] + skipped_targets = [] + max_attempts = _unsent_retry_max_attempts() + for target_name in user.get("targets") or []: + if _target_sent_today(user, target_name, now): + continue + attempts_today = _target_failure_attempts_today(user, target_name, now) + if attempts_today >= max_attempts: + skipped_targets.append(f"{target_name}({attempts_today})") + continue + retry_targets.append(target_name) + return retry_targets, skipped_targets + + def _scheduled_send_time(user, target_name, send_window, now): window_minutes = (send_window["endHour"] - send_window["startHour"]) * 60 start_of_window = now.replace( @@ -373,7 +447,8 @@ def _select_due_targets(user, send_window, now): second=0, microsecond=0, ) - if now < window_start or now > window_end: + window_grace_end = window_end + timedelta(minutes=send_window["scheduleIntervalMinutes"]) + if now < window_start or now > window_grace_end: already_sent = [] pending_targets = [] queued_failures = [] @@ -430,6 +505,29 @@ def _prepare_active_users_for_run(active_config, active_user_data): logger.info("No queued failures are pending for manual retry") return runnable_users + if _manual_run_unsent_only(): + logger.info( + "SPARKFLOW_MANUAL_RUN=1 and SPARKFLOW_MANUAL_UNSENT_ONLY=1, retrying today's unsent targets only" + ) + runnable_users = [] + for user in active_user_data: + retry_targets, skipped_targets = _pending_unsent_targets(user, now) + already_sent = [target for target in user.get("targets") or [] if _target_sent_today(user, target, now)] + logger.info( + "manual-unsent user=%s retryTargets=%s alreadySentToday=%s skippedMaxAttempts=%s", + user.get("username", "unknown"), + retry_targets, + already_sent, + skipped_targets, + ) + if retry_targets: + runnable_user = dict(user) + runnable_user["targets"] = retry_targets + runnable_users.append(runnable_user) + if not runnable_users: + logger.info("No unsent targets are pending for manual retry") + return runnable_users + if _is_manual_run(): logger.info("SPARKFLOW_MANUAL_RUN=1, bypassing daily send window") return [dict(user, targets=list(user.get("targets") or [])) for user in active_user_data] diff --git a/DouYinSparkFlow/webui/app.py b/DouYinSparkFlow/webui/app.py index 6e43fee..2d4eda9 100644 --- a/DouYinSparkFlow/webui/app.py +++ b/DouYinSparkFlow/webui/app.py @@ -39,7 +39,15 @@ from webui.auth import ( validate_csrf, verify_password, ) -from webui.ops import get_ops_snapshot, read_log_tail, refresh_proxy, restart_proxy, run_task_now, update_daily_schedule +from webui.ops import ( + get_ops_snapshot, + read_log_tail, + refresh_proxy, + restart_proxy, + run_task_now, + run_unsent_retry_now, + update_daily_schedule, +) BASE_DIR = Path(__file__).resolve().parent @@ -580,6 +588,23 @@ def create_app(): flash(request, f"Triggered a full resend run in the background (pid {pid}).", "success") return redirect("/") + @app.post("/ops/run-unsent") + async def run_unsent(request: Request): + maybe_redirect = require_user(request) + if maybe_redirect: + return maybe_redirect + + form = await request.form() + if not validate_csrf(request, str(form.get("csrf_token", ""))): + return Response("Invalid CSRF token", status_code=403) + + pid = run_unsent_retry_now() + if pid == -1: + flash(request, "Failed to start the unsent-target fallback run. Check server logs for details.", "error") + else: + flash(request, f"Triggered an unsent-target fallback run in the background (pid {pid}).", "success") + return redirect("/ops/send-console") + @app.post("/ops/proxy/refresh") async def proxy_refresh(request: Request): maybe_redirect = require_user(request) diff --git a/DouYinSparkFlow/webui/ops.py b/DouYinSparkFlow/webui/ops.py index 9a88442..54d4549 100644 --- a/DouYinSparkFlow/webui/ops.py +++ b/DouYinSparkFlow/webui/ops.py @@ -69,11 +69,31 @@ def build_task_run_spec(): return [sys.executable, "main.py", "--doTask"], repo_root() -def build_scheduled_task_command(): +def _env_shell_prefix(extra_env=None): + parts = [] + for key, value in (extra_env or {}).items(): + parts.append(f"{key}={shlex.quote(str(value))}") + return " ".join(parts) + + +def _with_env_prefix(command, extra_env=None): + env_prefix = _env_shell_prefix(extra_env) + return f"env {env_prefix} {command}" if env_prefix else command + + +def _compose_env_args(extra_env=None): + parts = [] + for key, value in (extra_env or {}).items(): + parts.extend(["-e", f"{key}={value}"]) + return " ".join(shlex.quote(part) for part in parts) + + +def build_scheduled_task_command(extra_env=None, trigger_label="scheduled send"): if running_in_container(): + task_command = _with_env_prefix("python main.py --doTask", extra_env) return ( "/bin/bash -lc 'timestamp=$(date -Iseconds); " - "echo \"[AUTO_TRIGGER] $timestamp scheduled send start\"; " + f"echo \"[AUTO_TRIGGER] $timestamp {trigger_label} start\"; " "container=$(docker ps --format \"{{.Names}}\" | " "grep -E \"^(douyin-web-hostfix|douyin-web)$\" | head -n 1); " "if [ -z \"$container\" ]; then " @@ -82,21 +102,35 @@ def build_scheduled_task_command(): "fi; " "echo \"[AUTO_TRIGGER] $timestamp container=$container\"; " "docker exec \"$container\" sh -lc " - "\"cd /app && python main.py --doTask\"'" + f"\"cd /app && {task_command}\"'" ) if compose_file_path(): compose_root_quoted = shlex.quote(str(compose_root())) + compose_env_args = _compose_env_args(extra_env) + compose_env_suffix = f" {compose_env_args}" if compose_env_args else "" return ( "/bin/bash -lc " - f"'echo \"[AUTO_TRIGGER] $(date -Iseconds) compose task start\"; " - f"cd {compose_root_quoted} && /usr/bin/docker compose run --rm task'" + f"'echo \"[AUTO_TRIGGER] $(date -Iseconds) compose {trigger_label} start\"; " + f"cd {compose_root_quoted} && /usr/bin/docker compose run --rm{compose_env_suffix} task'" ) repo_root_quoted = shlex.quote(str(repo_root())) python_quoted = shlex.quote(sys.executable) + task_command = _with_env_prefix(f"{python_quoted} main.py --doTask", extra_env) return ( "/bin/bash -lc " - f"'echo \"[AUTO_TRIGGER] $(date -Iseconds) local task start\"; " - f"cd {repo_root_quoted} && {python_quoted} main.py --doTask'" + f"'echo \"[AUTO_TRIGGER] $(date -Iseconds) local {trigger_label} start\"; " + f"cd {repo_root_quoted} && {task_command}'" + ) + + +def build_unsent_fallback_task_command(): + return build_scheduled_task_command( + { + "SPARKFLOW_MANUAL_RUN": "1", + "SPARKFLOW_MANUAL_UNSENT_ONLY": "1", + "PYTHONUNBUFFERED": "1", + }, + trigger_label="unsent fallback", ) @@ -204,18 +238,21 @@ def get_task_container_rows(): return [] -def run_task_now(): +def run_task_now(*, unsent_only=False): try: log_file = Path(get_app_settings().get("ops_log_file") or "/var/log/douyin-sparkflow.log") command, cwd = build_task_run_spec() + run_env = { + "SPARKFLOW_MANUAL_RUN": "1", + "PYTHONUNBUFFERED": "1", + } + if unsent_only: + run_env["SPARKFLOW_MANUAL_UNSENT_ONLY"] = "1" return run_background_command( command, log_file, cwd=cwd, - env={ - "SPARKFLOW_MANUAL_RUN": "1", - "PYTHONUNBUFFERED": "1", - }, + env=run_env, ) except Exception as exc: import traceback @@ -224,6 +261,10 @@ def run_task_now(): return -1 +def run_unsent_retry_now(): + return run_task_now(unsent_only=True) + + def refresh_proxy(): try: script = Path(get_app_settings().get("proxy_refresh_script") or "") @@ -308,6 +349,7 @@ def validate_time_string(time_string): def replace_douyin_cron_schedule(crontab_text, time_string): schedule = parse_schedule_string(time_string) scheduled_command = build_scheduled_task_command() + fallback_command = build_unsent_fallback_task_command() updated = [] for raw_line in crontab_text.splitlines(): @@ -325,6 +367,10 @@ def replace_douyin_cron_schedule(crontab_text, time_string): f"0 {schedule['endHour']} * * * " f"{scheduled_command} >> /var/log/douyin-sparkflow.log 2>&1" ) + updated.append( + f"{schedule['scheduleIntervalMinutes']} {schedule['endHour']} * * * " + f"{fallback_command} >> /var/log/douyin-sparkflow.log 2>&1" + ) else: updated.append( f"{schedule['minute']} {schedule['hour']} * * * " diff --git a/DouYinSparkFlow/webui/templates/dashboard.html b/DouYinSparkFlow/webui/templates/dashboard.html index 2174585..64c4387 100644 --- a/DouYinSparkFlow/webui/templates/dashboard.html +++ b/DouYinSparkFlow/webui/templates/dashboard.html @@ -7,6 +7,10 @@ {% block topbar_actions %} ✦ 发送控制台 +