From b5e61a14ea2b7a16edf41b14334e49716c49e2fd Mon Sep 17 00:00:00 2001 From: Rixuan Shao <2023311022@bipt.edu.cn> Date: Sat, 20 Jun 2026 01:59:01 +0800 Subject: [PATCH] Sync deployed SparkFlow reliability updates --- DouYinSparkFlow/Dockerfile.server | 18 +- DouYinSparkFlow/core/browser.py | 66 +- DouYinSparkFlow/core/friends.py | 151 +- DouYinSparkFlow/core/tasks.py | 1674 ++++++++++++++--- .../scripts/start_login_desktop.sh | 20 +- DouYinSparkFlow/utils/config.py | 12 +- DouYinSparkFlow/webui/app.py | 186 +- DouYinSparkFlow/webui/ops.py | 227 ++- DouYinSparkFlow/webui/templates/base.html | 605 ++++-- .../webui/templates/dashboard.html | 842 ++++++--- DouYinSparkFlow/webui/templates/login.html | 215 ++- DouYinSparkFlow/webui/templates/logs.html | 209 +- .../webui/templates/send_console.html | 708 +++++-- 13 files changed, 3891 insertions(+), 1042 deletions(-) diff --git a/DouYinSparkFlow/Dockerfile.server b/DouYinSparkFlow/Dockerfile.server index dfc67e2..cda29ba 100644 --- a/DouYinSparkFlow/Dockerfile.server +++ b/DouYinSparkFlow/Dockerfile.server @@ -1,5 +1,4 @@ -ARG PLAYWRIGHT_BASE_IMAGE=mcr.microsoft.com/playwright/python:v1.56.0-jammy -FROM ${PLAYWRIGHT_BASE_IMAGE} +FROM mcr.microsoft.com/playwright/python:v1.56.0-jammy WORKDIR /app @@ -26,12 +25,13 @@ ENV PIP_TRUSTED_HOST=${PIP_TRUSTED_HOST} COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt -RUN set -eux; \ - sed -i 's/archive.ubuntu.com/mirrors.aliyun.com/g' /etc/apt/sources.list \ +RUN sed -i 's/archive.ubuntu.com/mirrors.aliyun.com/g' /etc/apt/sources.list \ && sed -i 's/security.ubuntu.com/mirrors.aliyun.com/g' /etc/apt/sources.list \ && ln -fs /usr/share/zoneinfo/${TZ} /etc/localtime \ && echo ${TZ} > /etc/timezone \ - && apt-get update && apt-get install -y --no-install-recommends \ + && apt-get update && apt-get install -y \ + cron \ + curl \ fluxbox \ fonts-wqy-zenhei \ fonts-noto-cjk \ @@ -39,6 +39,14 @@ RUN set -eux; \ websockify \ x11vnc \ xfonts-intl-chinese \ + && curl -fsSL -x http://127.0.0.1:7890 https://download.docker.com/linux/static/stable/x86_64/docker-25.0.3.tgz -o docker.tgz \ + && tar xzvf docker.tgz \ + && mv docker/docker /usr/bin/docker \ + && chmod +x /usr/bin/docker \ + && rm -rf docker docker.tgz \ + && mkdir -p /usr/local/lib/docker/cli-plugins \ + && curl -SL -x http://127.0.0.1:7890 https://github.com/docker/compose/releases/download/v2.24.5/docker-compose-linux-x86_64 -o /usr/local/lib/docker/cli-plugins/docker-compose \ + && chmod +x /usr/local/lib/docker/cli-plugins/docker-compose \ && rm -rf /var/lib/apt/lists/* COPY . . diff --git a/DouYinSparkFlow/core/browser.py b/DouYinSparkFlow/core/browser.py index 0be8515..1c358c5 100644 --- a/DouYinSparkFlow/core/browser.py +++ b/DouYinSparkFlow/core/browser.py @@ -1,4 +1,5 @@ import os +import re import subprocess import sys import traceback @@ -12,6 +13,7 @@ from utils.config import DEBUG, Environment, get_environment console = Console() PLAYWRIGHT_BROWSERS_PATH = "../chrome" +DEFAULT_PROFILE_ROOT = "/opt/douyin-sparkflow/state/browser-profiles" def _local_browser_bundle_path(): @@ -32,6 +34,38 @@ def configure_playwright_environment(): os.environ["PLAYWRIGHT_BROWSERS_PATH"] = str(bundle_path.resolve()) +def _headless_for(GUI=False): + headless = not GUI + if get_environment() == Environment.LOCAL and DEBUG: + headless = False + return headless + + +def _browser_args(): + return [ + "--disable-dev-shm-usage", + "--no-sandbox", + ] + + +def sanitize_profile_name(value): + raw = str(value or "").strip() + if not raw: + raw = "unknown" + safe = re.sub(r"[^0-9A-Za-z._-]+", "_", raw) + safe = safe.strip("._-") or "unknown" + return safe[:80] + + +def browser_profile_root(root=None): + configured = ( + root + or os.getenv("SPARKFLOW_BROWSER_PROFILE_ROOT") + or DEFAULT_PROFILE_ROOT + ) + return Path(configured) + + async def install_browser(): try: subprocess.run([sys.executable, "-m", "playwright", "install", "chromium"], check=True) @@ -43,15 +77,11 @@ async def install_browser(): async def get_browser(GUI=False): configure_playwright_environment() - headless = not GUI - if get_environment() == Environment.LOCAL and DEBUG: - headless = False - try: playwright = await async_playwright().start() browser = await playwright.chromium.launch( - headless=headless, - args=["--disable-dev-shm-usage"], + headless=_headless_for(GUI), + args=_browser_args(), ) return playwright, browser except Exception as exc: @@ -61,3 +91,27 @@ async def get_browser(GUI=False): sys.exit(1) traceback.print_exc() raise + + +async def get_persistent_browser_context(profile_name, GUI=False, root=None): + configure_playwright_environment() + + profile_dir = browser_profile_root(root) / sanitize_profile_name(profile_name) + profile_dir.mkdir(parents=True, exist_ok=True) + + try: + playwright = await async_playwright().start() + context = await playwright.chromium.launch_persistent_context( + str(profile_dir), + headless=_headless_for(GUI), + viewport={"width": 1600, "height": 1000}, + args=_browser_args(), + ) + return playwright, context, profile_dir + except Exception as exc: + if "Executable doesn't exist" in str(exc) and get_environment() != Environment.GITHUBACTION: + console.print("[bold red]Playwright browser is missing.[/bold red]") + await install_browser() + sys.exit(1) + traceback.print_exc() + raise diff --git a/DouYinSparkFlow/core/friends.py b/DouYinSparkFlow/core/friends.py index 65c6680..0f509c0 100644 --- a/DouYinSparkFlow/core/friends.py +++ b/DouYinSparkFlow/core/friends.py @@ -1,27 +1,25 @@ import asyncio from pathlib import Path -import time from core.browser import get_browser CHAT_PAGE_URL = "https://creator.douyin.com/creator-micro/data/following/chat" -FRIENDS_TAB_SELECTORS = ( - 'xpath=//*[@id="sub-app"]/div/div/div[1]/div[2]', - 'xpath=//*[@id="sub-app"]//*[self::div or self::span or self::button][contains(normalize-space(.), "朋友私信") and string-length(normalize-space(.)) <= 20]', - 'xpath=//*[@id="sub-app"]//*[self::div or self::span or self::button][normalize-space()="朋友"]', +FRIENDS_TAB_SELECTOR = 'xpath=//*[@id="sub-app"]/div/div/div[1]/div[2]' +TARGET_SELECTOR = ( + 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]' + '//div[contains(@class, "semi-list-item-body semi-list-item-body-flex-start")]' ) -FRIEND_NAME_SELECTOR = 'xpath=//*[@id="sub-app"]//span[contains(@class, "item-header-name-")]' -SCROLLABLE_FRIENDS_SELECTORS = ( - 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]/div/div/div[3]/div/div/div/ul/div', - 'xpath=//*[@id="sub-app"]//ul/div', - 'xpath=//*[@id="sub-app"]//ul', +SCROLLABLE_FRIENDS_SELECTOR = ( + 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]/div/div/div[3]/div/div/div/ul/div' ) NO_MORE_SELECTOR = 'xpath=//div[contains(@class, "no-more-tip-ftdJnu")]' LOADING_SELECTOR = 'xpath=//div[contains(@class, "semi-spin")]' +FIRST_FRIEND_SELECTOR = ( + 'xpath=//*[@id="sub-app"]/div/div/div[2]/div[2]/div/div/div[1]/div/div/div/ul/div/div/div[1]/li/div' +) +FRIEND_NAME_SELECTOR = """xpath=.//span[contains(@class, "item-header-name-")]""" LOGIN_MASK_SELECTORS = [".login-mask", ".login-guide-container", ".login-img-code-wrapper"] -EMPTY_LIST_KEYWORDS = ("暂无", "没有", "空空", "还没有") -LOGIN_KEYWORDS = ("扫码登录", "登录抖音", "请登录", "登录已过期", "重新登录") def update_collection_progress(new_names_count, no_more_visible, scroll_moved, idle_rounds, stuck_rounds, idle_limit=5, stuck_limit=2): @@ -32,10 +30,6 @@ def update_collection_progress(new_names_count, no_more_visible, scroll_moved, i async def _ensure_logged_in(page): - current_url = page.url or "" - if "login" in current_url or "passport" in current_url: - raise RuntimeError("账号登录已失效,请重新扫码登录") - for selector in LOGIN_MASK_SELECTORS: try: locator = page.locator(selector).first @@ -47,115 +41,12 @@ async def _ensure_logged_in(page): continue -async def _body_text(page, limit=600): - try: - text = await page.locator("body").inner_text(timeout=2000) - except Exception: - return "" - return " ".join(text.split())[:limit] - - -async def _page_diagnosis(page): - await _ensure_logged_in(page) - body_text = await _body_text(page) - if any(keyword in body_text for keyword in LOGIN_KEYWORDS): - raise RuntimeError("账号登录已失效或页面要求重新登录,请重新扫码登录") - if any(keyword in body_text for keyword in EMPTY_LIST_KEYWORDS): - return "页面提示当前没有可读取的朋友私信好友" - return f"未等到好友列表。当前URL={page.url},页面提示={body_text or '空'}" - - -async def _open_friends_tab(page): - if await page.locator(FRIEND_NAME_SELECTOR).count() > 0: - return - - last_error = None - for selector in FRIENDS_TAB_SELECTORS: - locator = page.locator(selector).first - try: - await locator.wait_for(state="visible", timeout=10000) - await locator.click(timeout=5000) - await asyncio.sleep(1.5) - return - except Exception as exc: - last_error = exc - - raise RuntimeError(f"未找到“朋友私信”入口,可能页面结构变化或账号未登录。最后错误:{last_error}") - - -async def _wait_for_friend_name_or_empty(page, timeout_ms=45000): - deadline = time.monotonic() + timeout_ms / 1000 - while time.monotonic() < deadline: - await _ensure_logged_in(page) - names = page.locator(FRIEND_NAME_SELECTOR) - if await names.count() > 0: - first = names.first - try: - if await first.is_visible(): - return True - except Exception: - return True - - body_text = await _body_text(page, limit=300) - if any(keyword in body_text for keyword in EMPTY_LIST_KEYWORDS): - return False - if any(keyword in body_text for keyword in LOGIN_KEYWORDS): - raise RuntimeError("账号登录已失效或页面要求重新登录,请重新扫码登录") - - loading = page.locator(LOADING_SELECTOR).first - if await loading.count() > 0 and await loading.is_visible(): - await asyncio.sleep(1.5) - else: - await asyncio.sleep(1) - - diagnosis = await _page_diagnosis(page) - raise RuntimeError(diagnosis) - - -async def _collect_visible_friend_names(page): - names = [] - for raw_name in await page.locator(FRIEND_NAME_SELECTOR).all_inner_texts(): - name = raw_name.strip() - if name: - names.append(name) - return names - - -async def _find_scrollable_friends_element(page): - for selector in SCROLLABLE_FRIENDS_SELECTORS: - try: - handle = await page.locator(selector).first.element_handle(timeout=2000) - if handle: - return handle - except Exception: - continue - - try: - handle = await page.evaluate_handle( - """() => { - const firstName = document.querySelector('#sub-app span[class*="item-header-name-"]'); - let node = firstName; - while (node && node !== document.body) { - const style = window.getComputedStyle(node); - const overflow = `${style.overflow} ${style.overflowY}`; - if (node.scrollHeight > node.clientHeight + 20 && /(auto|scroll|overlay)/.test(overflow)) { - return node; - } - node = node.parentElement; - } - return document.scrollingElement || document.documentElement; - }""" - ) - return handle.as_element() - except Exception: - return None - - async def collect_friend_names(page): - await _open_friends_tab(page) - has_friends = await _wait_for_friend_name_or_empty(page) - if not has_friends: - return [] + await page.wait_for_selector(FRIENDS_TAB_SELECTOR, timeout=30000) + await page.locator(FRIENDS_TAB_SELECTOR).click() + + await page.wait_for_selector(FIRST_FRIEND_SELECTOR, timeout=30000) + await page.locator(FIRST_FRIEND_SELECTOR).click() await asyncio.sleep(2) found_names = [] @@ -164,8 +55,13 @@ async def collect_friend_names(page): stuck_rounds = 0 while True: + target_elements = await page.locator(TARGET_SELECTOR).all() new_names_count = 0 - for name in await _collect_visible_friend_names(page): + for element in target_elements: + try: + name = (await element.locator(FRIEND_NAME_SELECTOR).inner_text()).strip() + except Exception: + continue if not name or name in seen_names: continue seen_names.add(name) @@ -180,12 +76,11 @@ async def collect_friend_names(page): if await loading.count() > 0 and await loading.is_visible(): await asyncio.sleep(1.5) - scrollable_element = await _find_scrollable_friends_element(page) + scrollable_element = await page.locator(SCROLLABLE_FRIENDS_SELECTOR).element_handle() if not scrollable_element: if found_names: return found_names - diagnosis = await _page_diagnosis(page) - raise RuntimeError(f"未找到好友列表滚动容器;{diagnosis}") + raise RuntimeError("未找到好友列表滚动容器") before_top = await page.evaluate("(element) => element.scrollTop", scrollable_element) await page.evaluate("(element) => element.scrollTop += 800", scrollable_element) diff --git a/DouYinSparkFlow/core/tasks.py b/DouYinSparkFlow/core/tasks.py index cb783ad..5d7aa79 100644 --- a/DouYinSparkFlow/core/tasks.py +++ b/DouYinSparkFlow/core/tasks.py @@ -2,22 +2,15 @@ import asyncio import hashlib import logging import os +import random import unicodedata from contextlib import contextmanager from datetime import datetime, timedelta, timezone from pathlib import Path from zoneinfo import ZoneInfo -from core.browser import get_browser -from core.friends import ( - FRIEND_NAME_SELECTOR, - LOADING_SELECTOR, - NO_MORE_SELECTOR, - _find_scrollable_friends_element, - _open_friends_tab, - _wait_for_friend_name_or_empty, -) -from core.msg_builder import build_message +from core.browser import get_browser, get_persistent_browser_context, sanitize_profile_name +from core.msg_builder import build_message, build_message_candidates from core.protocol_dispatch import run_protocol_tasks from utils.config import get_config, get_userData, normalize_unique_id, save_userData from utils.logger import setup_logger @@ -28,6 +21,8 @@ user_data = get_userData() logger = setup_logger(level=logging.DEBUG) debug_artifacts_dir = Path("logs/debug_artifacts") debug_artifacts_dir.mkdir(parents=True, exist_ok=True) +CREATOR_HOME_URL = "https://creator.douyin.com/" +CREATOR_CHAT_URL = "https://creator.douyin.com/creator-micro/data/following/chat" async def retry_operation(name, operation, retries=3, delay=2, *args, **kwargs): @@ -63,6 +58,315 @@ def _current_run_mode(): return "manual" if _is_manual_run() else "scheduled" +def _coerce_non_negative_int(value, default): + try: + return max(0, int(value)) + except (TypeError, ValueError): + return max(0, int(default)) + + +def _coerce_positive_int(value, default): + try: + return max(1, int(value)) + except (TypeError, ValueError): + return max(1, int(default)) + + +def _coerce_positive_float(value, default): + try: + parsed = float(value) + except (TypeError, ValueError): + parsed = float(default) + if parsed <= 0: + parsed = float(default) + return parsed + + +def _normalize_friend_list_scan_config(active_config): + raw = active_config.get("friendListScan", {}) or {} + return { + "maxScanSeconds": _coerce_positive_int(raw.get("maxScanSeconds", 300), 300), + "idleScanSeconds": _coerce_positive_int(raw.get("idleScanSeconds", 120), 120), + "scrollStepPx": _coerce_positive_int(raw.get("scrollStepPx", 400), 400), + "scrollDelaySeconds": _coerce_positive_float(raw.get("scrollDelaySeconds", 1.5), 1.5), + } + + +def _normalize_send_strategy(active_config): + raw = active_config.get("sendStrategy", {}) or {} + start_min = _coerce_non_negative_int(raw.get("accountStartDelaySecondsMin", 0), 0) + start_max = _coerce_non_negative_int(raw.get("accountStartDelaySecondsMax", start_min), start_min) + if start_max < start_min: + start_max = start_min + + message_min = _coerce_non_negative_int(raw.get("messageIntervalSecondsMin", 0), 0) + message_max = _coerce_non_negative_int(raw.get("messageIntervalSecondsMax", message_min), message_min) + if message_max < message_min: + message_max = message_min + + return { + "accountStartDelaySecondsMin": start_min, + "accountStartDelaySecondsMax": start_max, + "messageIntervalSecondsMin": message_min, + "messageIntervalSecondsMax": message_max, + } + + +def _normalize_persistent_profile_config(active_config): + raw = active_config.get("persistentBrowserProfiles", {}) or {} + return { + "enabled": bool(raw.get("enabled", False)), + "root": str(raw.get("root") or "/opt/douyin-sparkflow/state/browser-profiles"), + "seedCookiesWhenEmpty": bool(raw.get("seedCookiesWhenEmpty", True)), + "syncStoredCookiesBeforeRun": bool(raw.get("syncStoredCookiesBeforeRun", True)), + "refreshStoredCookiesAfterLogin": bool(raw.get("refreshStoredCookiesAfterLogin", True)), + } + + +def _account_profile_name(user): + unique_id = normalize_unique_id(user.get("unique_id")) + username = str(user.get("username") or "").strip() + if unique_id: + return f"uid-{unique_id}" + if username: + return f"user-{sanitize_profile_name(username)}" + return "unknown" + + +def _random_delay_seconds(send_strategy, min_key, max_key): + return random.randint(send_strategy[min_key], send_strategy[max_key]) + + +async def _sleep_with_log(seconds, reason, account_name): + if seconds <= 0: + return + logger.info("%s for %s by %ss", reason, account_name, seconds) + await asyncio.sleep(seconds) + + +LOGIN_REQUIRED_TEXTS = ( + "扫码登录", + "验证码登录", + "密码登录", + "登录/注册", + "登录后免费", + "请登录", + "身份验证", + "安全验证", + "风险提示", + "环境异常", +) +LOGIN_REQUIRED_SELECTORS = ( + ".login-mask", + ".login-guide-container", + ".login-img-code-wrapper", + ".pc-login-verification-modal", +) +LOGIN_REQUIRED_DIALOG_SELECTORS = ( + ".semi-modal-content", + 'div[role="dialog"]', +) +NON_LOGIN_DIALOG_DISMISS_TEXTS = ( + "我知道了", + "知道了", + "好的", + "确定", + "确认", + "稍后再说", + "关闭", +) +NON_LOGIN_DIALOG_CLOSE_SELECTORS = ( + ".semi-modal-close", + 'button[aria-label="Close"]', + 'button[aria-label="关闭"]', + '[aria-label="Close"]', + '[aria-label="关闭"]', +) +AUTH_COOKIE_NAMES = { + "sessionid", + "sessionid_ss", + "sid_guard", + "sid_tt", + "uid_tt", + "uid_tt_ss", +} + + +async def _body_text_sample(page, limit=1200): + try: + return (await page.locator("body").inner_text(timeout=3000)).replace("\n", " ")[:limit] + except Exception as exc: + return f"BODY_TEXT_ERROR={exc!r}" + + +def _find_login_required_text(text): + sample = text or "" + for marker in LOGIN_REQUIRED_TEXTS: + if marker in sample: + return marker + return None + + +async def _page_has_login_required_prompt(page): + for selector in LOGIN_REQUIRED_SELECTORS: + locator = page.locator(selector).first + try: + if await locator.count() > 0 and await locator.is_visible(timeout=500): + return True, f"visible selector {selector}" + except Exception: + continue + + for selector in LOGIN_REQUIRED_DIALOG_SELECTORS: + locator = page.locator(selector) + try: + count = min(await locator.count(), 5) + except Exception: + continue + + for index in range(count): + item = locator.nth(index) + try: + if not await item.is_visible(timeout=500): + continue + dialog_text = (await item.inner_text(timeout=1000)).replace("\n", " ") + except Exception: + continue + + matched_text = _find_login_required_text(dialog_text) + if matched_text: + return True, ( + f"visible dialog {selector} contains {matched_text!r}: " + f"{dialog_text[:300]}" + ) + logger.debug( + "Ignoring visible non-login dialog for selector %s: %s", + selector, + dialog_text[:160], + ) + + sample = await _body_text_sample(page) + matched_text = _find_login_required_text(sample) + if matched_text: + return True, f"body contains {matched_text!r}: {sample[:300]}" + return False, "" + + +async def ensure_not_login_required(page, account_name, stage): + is_required, detail = await _page_has_login_required_prompt(page) + if is_required: + raise RuntimeError(f"login_required at {stage} for {account_name}: {detail}") + + +async def _dismiss_non_login_dialogs(page, account_name, stage): + dismissed = 0 + for selector in LOGIN_REQUIRED_DIALOG_SELECTORS: + locator = page.locator(selector) + try: + count = min(await locator.count(), 5) + except Exception: + continue + + for index in range(count): + item = locator.nth(index) + try: + if not await item.is_visible(timeout=500): + continue + dialog_text = (await item.inner_text(timeout=1000)).replace("\n", " ") + except Exception: + continue + + if _find_login_required_text(dialog_text): + continue + + clicked = False + for text in NON_LOGIN_DIALOG_DISMISS_TEXTS: + button = item.get_by_text(text, exact=True).first + try: + if await button.count() > 0 and await button.is_visible(timeout=500): + await button.click(timeout=2000) + clicked = True + break + except Exception: + continue + + if not clicked: + for close_selector in NON_LOGIN_DIALOG_CLOSE_SELECTORS: + close_button = item.locator(close_selector).first + try: + if await close_button.count() > 0 and await close_button.is_visible(timeout=500): + await close_button.click(timeout=2000) + clicked = True + break + except Exception: + continue + + if clicked: + dismissed += 1 + logger.info( + "Dismissed non-login dialog for %s at %s: %s", + account_name, + stage, + dialog_text[:160], + ) + await asyncio.sleep(0.5) + else: + logger.debug( + "Visible non-login dialog for %s at %s was not dismissed: %s", + account_name, + stage, + dialog_text[:160], + ) + return dismissed + + +async def apply_stored_cookies_to_profile(context, cookies, account_name, only_when_empty=False): + if not cookies: + return + if only_when_empty: + try: + current_cookies = await context.cookies( + ["https://creator.douyin.com/", "https://www.douyin.com/"] + ) + except Exception: + current_cookies = [] + + if any(cookie.get("name") in AUTH_COOKIE_NAMES for cookie in current_cookies): + logger.info( + "Persistent profile for %s already has auth cookies; not seeding stored cookies", + account_name, + ) + return + + await context.add_cookies(cookies) + logger.info("Applied %s stored cookies to persistent profile for %s", len(cookies), account_name) + + +async def refresh_stored_cookies_from_profile(context, user, account_name): + try: + cookies = await context.cookies() + except Exception as exc: + logger.warning("Could not read persistent profile cookies for %s: %s", account_name, exc) + return + + if not cookies: + return + + accounts = get_userData(force_reload=True) + matched_account = _find_matching_account(accounts, user) + if matched_account is None: + logger.warning("Could not find account to refresh cookies for user=%s", account_name) + return + + matched_account["cookies"] = list(cookies) + save_userData(accounts) + user["cookies"] = list(cookies) + logger.info("Refreshed stored cookies for %s from persistent profile count=%s", account_name, len(cookies)) + + +def _is_stop_account_category(category): + return _is_account_level_failure_category(category) + + async def save_debug_artifacts(page, account_name, target_name, stage): if not get_config(force_reload=True).get("saveDebugArtifacts", False): return @@ -109,45 +413,138 @@ async def read_chat_input_text(chat_input): return "" -async def confirm_message_sent(page, chat_input, message): - await asyncio.sleep(2) +def _message_probe_text(message): + for line in str(message or "").splitlines(): + probe = line.strip() + if probe: + return probe + return str(message or "").strip() - input_text = await read_chat_input_text(chat_input) - if not input_text: - return True, "chat input cleared" - first_line = message.split("\n")[0].strip() - if first_line: +async def count_visible_message_matches(page, message, chat_input=None): + probe = _message_probe_text(message) + if not probe: + return 0 + + locator = page.locator(f"text={probe}") + try: + count = await locator.count() + except Exception: + return 0 + + input_box = None + if chat_input is not None: try: - bubble = page.locator(f"text={first_line}").last - if await bubble.count() > 0: - return True, "message bubble located" + input_box = await chat_input.bounding_box(timeout=1000) except Exception: - pass + input_box = None - return False, f"chat input still contains: {input_text!r}" + visible_count = 0 + for index in range(min(count, 200)): + try: + item = locator.nth(index) + if not await item.is_visible(timeout=500): + continue + if await item.evaluate("(node) => Boolean(node.closest('[contenteditable=\"true\"]'))"): + continue + if input_box: + box = await item.bounding_box(timeout=500) + if not box: + continue + center_x = box["x"] + box["width"] / 2 + center_y = box["y"] + box["height"] / 2 + in_chat_column = ( + input_box["x"] - 80 + <= center_x + <= input_box["x"] + input_box["width"] + 80 + ) + above_input = center_y < input_box["y"] + input_box["height"] + 80 + if not (in_chat_column and above_input): + continue + visible_count += 1 + except Exception: + continue + return visible_count -async def detect_message_already_sent(page, chat_input, message): +async def confirm_message_sent(page, chat_input, message, before_visible_count=None): + if before_visible_count is None: + before_visible_count = await count_visible_message_matches(page, message, chat_input=chat_input) + + last_input_text = "" + last_visible_count = before_visible_count + deadline = asyncio.get_running_loop().time() + 8 + while True: + await asyncio.sleep(1) + last_input_text = await read_chat_input_text(chat_input) + last_visible_count = await count_visible_message_matches(page, message, chat_input=chat_input) + if not last_input_text and last_visible_count > before_visible_count: + return ( + True, + f"visible message count increased {before_visible_count}->{last_visible_count}; chat input cleared", + ) + if asyncio.get_running_loop().time() >= deadline: + break + + if last_input_text: + return False, f"chat input still contains: {last_input_text!r}" + return ( + False, + f"chat input cleared but visible message count did not increase: " + f"before={before_visible_count} after={last_visible_count}", + ) + + +async def find_visible_spark_message_candidate(page, chat_input=None, candidates=None): + messages = [] + for candidate in candidates or build_message_candidates(): + probe = _message_probe_text(candidate) + if probe and probe not in messages: + messages.append(probe) + + for candidate in messages: + try: + if await count_visible_message_matches(page, candidate, chat_input=chat_input) > 0: + return candidate + except Exception: + continue + return "" + + +async def detect_message_already_sent(page, chat_input, message, before_visible_count=None): try: if chat_input is not None: - sent_ok, detail = await confirm_message_sent(page, chat_input, message) + sent_ok, detail = await confirm_message_sent( + page, + chat_input, + message, + before_visible_count=before_visible_count, + ) if sent_ok: return True, detail except Exception: pass - first_line = message.split("\n")[0].strip() + first_line = _message_probe_text(message) if not first_line: return False, "" try: - bubble = page.locator(f"text={first_line}").last - if await bubble.count() > 0: + visible_count = await count_visible_message_matches(page, first_line, chat_input=chat_input) + if visible_count > (before_visible_count or 0): return True, "message bubble located after failure" except Exception: pass + try: + input_text = await read_chat_input_text(chat_input) if chat_input is not None else "" + if not input_text: + matched_candidate = await find_visible_spark_message_candidate(page, chat_input=chat_input) + if matched_candidate: + return True, f"spark message candidate visible in current chat: {matched_candidate!r}" + except Exception: + pass + return False, "" @@ -155,8 +552,14 @@ def classify_browser_failure(stage, exc): detail = str(exc or "") lowered = detail.lower() - if "登录" in detail or "login" in lowered or "passport" in lowered: - return "login_expired" + if "friend_list_unavailable" in lowered: + return "friend_list_unavailable" + if "friend_list_incomplete" in lowered: + return "friend_list_incomplete" + if "login_required" in lowered: + return "login_required" + if any(token in detail for token in LOGIN_REQUIRED_TEXTS): + return "login_required" if "page crashed" in lowered or "target page, context or browser has been closed" in lowered: return "page_crashed" if "timeout" in lowered: @@ -167,13 +570,11 @@ def classify_browser_failure(stage, exc): if stage == "friend_list": return "friend_list_timeout" return "timeout" - if "未找到“朋友私信”入口" in detail: - return "friend_tab_not_found" if "unable to locate chat input" in lowered: return "chat_input_not_found" - if "could not find the friend list scroll container" in lowered or "未找到好友列表滚动容器" in detail: + if "could not find the friend list scroll container" in lowered: return "friend_list_container_missing" - if "chat input still contains" in lowered: + if "chat input still contains" in lowered or "visible message count did not increase" in lowered: return "send_unconfirmed" if "missing targets" in lowered: return "friend_not_found" @@ -182,98 +583,464 @@ def classify_browser_failure(stage, exc): return "unknown" -async def _click_friend_entry(name_locator, account_name, target_name): - click_candidates = ( - "xpath=ancestor::li[1]", - 'xpath=ancestor::div[contains(@class, "semi-list-item")][1]', - 'xpath=ancestor::div[contains(@class, "semi-list-item-body")][1]', - 'xpath=ancestor::*[@role="listitem"][1]', - "xpath=ancestor::div[3]", - ) +class FriendListUnavailableError(RuntimeError): + pass - last_error = None - for selector in click_candidates: + +class FriendListIncompleteError(RuntimeError): + pass + + +ACCOUNT_LEVEL_FAILURE_CATEGORIES = { + "login_required", + "friend_list_unavailable", + "friend_list_timeout", + "friend_list_incomplete", + "friend_list_container_missing", + "navigation_timeout", + "navigation_failed", + "page_crashed", +} + + +TEMPORARY_ACCOUNT_FAILURE_CATEGORIES = { + "friend_list_unavailable", + "friend_list_timeout", + "friend_list_incomplete", + "friend_list_container_missing", + "navigation_timeout", + "navigation_failed", + "page_crashed", + "timeout", +} + + +def _is_account_level_failure_category(category): + return category in ACCOUNT_LEVEL_FAILURE_CATEGORIES + + +def _target_display_name(target): + if isinstance(target, dict): + for key in ("name", "displayName", "nickname", "username", "unique_id", "uid"): + value = str(target.get(key) or "").strip() + if value: + return value + return "" + return str(target or "").strip() + + +def _build_normalized_target_map(targets): + normalized_targets = {} + for target in targets or []: + display_name = _target_display_name(target) + normalized_name = _normalize_target_name(display_name) + if normalized_name: + normalized_targets[normalized_name] = display_name + return normalized_targets + + +async def _first_non_empty_locator(page, selectors): + for selector in selectors: + locator = page.locator(selector) try: - entry = name_locator.locator(selector).first - if await entry.count() == 0: + count = await locator.count() + except Exception: + continue + for index in range(min(count, 5)): + item = locator.nth(index) + try: + if await item.is_visible(timeout=500): + return selector, locator + except Exception: continue - await entry.scroll_into_view_if_needed(timeout=5000) - await entry.click(timeout=5000) - return + return "", None + + +async def _selector_visible(page, selectors): + for selector in selectors: + locator = page.locator(selector).first + try: + if await locator.count() > 0 and await locator.is_visible(timeout=500): + return selector + except Exception: + continue + return "" + + +async def _click_first_visible_locator(candidates, account_name, stage): + last_error = None + for label, locator in candidates: + try: + count = min(await locator.count(), 5) except Exception as exc: last_error = exc + continue + + for index in range(count): + item = locator.nth(index) + try: + if not await item.is_visible(timeout=800): + continue + await item.click(timeout=3000) + logger.info("Account %s clicked %s at %s", account_name, label, stage) + return True + except Exception as exc: + last_error = exc + continue + if last_error: + logger.debug("Account %s could not click a visible locator at %s: %s", account_name, stage, last_error) + return False + + +async def _open_friends_tab(page, account_name, fallback_selector, target_selectors): + await _dismiss_non_login_dialogs(page, account_name, "before_open_friends_tab") + + _, existing_locator = await _first_non_empty_locator(page, target_selectors) + if existing_locator: + logger.info("Account %s friend list is already visible before tab click", account_name) + return "already_visible" + + sub_app = page.locator('xpath=//*[@id="sub-app"]') + text_selector = ( + 'xpath=//*[@id="sub-app"]//*[self::div or self::button or self::span or @role="tab"]' + '[contains(normalize-space(.), "朋友") or contains(normalize-space(.), "好友") ' + 'or contains(normalize-space(.), "互关")]' + ) + candidates = [ + ("tab text 朋友", sub_app.get_by_text("朋友", exact=True)), + ("tab text 好友", sub_app.get_by_text("好友", exact=True)), + ("tab text 互关", sub_app.get_by_text("互关", exact=True)), + ("tab text 朋友私信", sub_app.get_by_text("朋友私信", exact=True)), + ("tab text 好友私信", sub_app.get_by_text("好友私信", exact=True)), + ("sub-app friend tab text", page.locator(text_selector)), + ("fallback friends tab xpath", page.locator(fallback_selector)), + ] + if await _click_first_visible_locator(candidates, account_name, "open_friends_tab"): + await asyncio.sleep(1) + return "clicked" + + await page.wait_for_selector(fallback_selector, timeout=30000) + await page.locator(fallback_selector).click(timeout=5000) + logger.info("Account %s clicked fallback friends tab after explicit wait", account_name) + await asyncio.sleep(1) + return "fallback_wait_clicked" + + +async def _reopen_friend_chat_page(page, account_name): + logger.warning( + "Account %s friend list did not become visible within 60s; reopening chat page once", + account_name, + ) + await _dismiss_non_login_dialogs(page, account_name, "before_reopen_chat_page") + try: + await page.reload(wait_until="load", timeout=120000) + await asyncio.sleep(2) + except Exception as exc: + logger.warning("Account %s reload before reopen chat page failed: %s", account_name, exc) + + await retry_operation( + "reopen chat page after friend list wait", + page.goto, + retries=2, + delay=5, + url=CREATOR_CHAT_URL, + ) + await asyncio.sleep(3) + await ensure_not_login_required(page, account_name, "reopen_chat_page") + await _dismiss_non_login_dialogs(page, account_name, "after_reopen_chat_page") + + +async def _wait_for_friend_list_ready( + page, + account_name, + target_selectors, + no_more_selectors, + loading_selectors, + timeout_seconds=120, + empty_grace_seconds=45, +): + started_at = asyncio.get_running_loop().time() + last_loading_seen_at = None + last_dialog_dismissed_at = None + while True: + dismissed_count = await _dismiss_non_login_dialogs(page, account_name, "friend_list_ready") + if dismissed_count: + last_dialog_dismissed_at = asyncio.get_running_loop().time() + + selector, locator = await _first_non_empty_locator(page, target_selectors) + if locator: + logger.debug("Account %s friend list ready via selector %s", account_name, selector) + return selector, locator + + is_login_required, login_detail = await _page_has_login_required_prompt(page) + if is_login_required: + raise RuntimeError(f"login_required at friend_list for {account_name}: {login_detail}") + + no_more_selector = await _selector_visible(page, no_more_selectors) + if no_more_selector: + elapsed = asyncio.get_running_loop().time() - started_at + if elapsed > empty_grace_seconds: + raise FriendListUnavailableError( + f"friend_list_unavailable for {account_name}: no friend entries visible; " + f"empty/end selector {no_more_selector} is visible after {empty_grace_seconds}s" + ) + logger.debug( + "Account %s saw empty/end selector %s after %.1fs; waiting within grace window", + account_name, + no_more_selector, + elapsed, + ) + + loading_selector = await _selector_visible(page, loading_selectors) + if loading_selector: + last_loading_seen_at = asyncio.get_running_loop().time() + logger.debug("Account %s waiting for friend list loading selector %s", account_name, loading_selector) + + elapsed = asyncio.get_running_loop().time() - started_at + if elapsed > timeout_seconds: + loading_note = "" + if last_loading_seen_at: + loading_note = f"; loading was visible {elapsed - last_loading_seen_at:.1f}s ago" + if last_dialog_dismissed_at: + loading_note += f"; dialog was dismissed {elapsed - (last_dialog_dismissed_at - started_at):.1f}s ago" + raise FriendListUnavailableError( + f"friend_list_unavailable for {account_name}: no friend entries became visible " + f"within {timeout_seconds}s{loading_note}" + ) + + await asyncio.sleep(1) + + +async def _extract_friend_display_name(element): + name_selectors = ( + """xpath=.//span[contains(@class, "item-header-name-")]""", + """xpath=.//*[contains(@class, "item-header-name-")]""", + ) + for selector in name_selectors: + locator = element.locator(selector).first + try: + if await locator.count() > 0: + text = (await locator.inner_text(timeout=1000)).strip() + if text: + return text + except Exception: + continue try: - await name_locator.scroll_into_view_if_needed(timeout=5000) - await name_locator.click(timeout=5000) - except Exception as exc: - raise RuntimeError( - f"Account {account_name} found target friend {target_name}, but could not click the friend entry: {exc}" - ) from (last_error or exc) + text = (await element.inner_text(timeout=1000)).strip() + except Exception: + return "" + for line in text.splitlines(): + line = line.strip() + if line: + return line + return "" -async def scroll_and_select_user(page, account_name, targets): - logger.debug("Account %s is opening the friends tab", account_name) - await _open_friends_tab(page) - has_friends = await _wait_for_friend_name_or_empty(page, timeout_ms=45000) - if not has_friends: - logger.warning("Account %s friend list is empty or unavailable", account_name) - return +async def _extract_friend_stable_keys(element): + stable_keys = set() + attrs = ( + "data-e2e", + "data-id", + "data-user-id", + "data-conversation-id", + "data-sec-uid", + "href", + "aria-label", + "title", + ) + for attr in attrs: + try: + value = await element.get_attribute(attr) + except Exception: + value = None + if value: + stable_keys.add(f"{attr}:{str(value).strip()[:240]}") + + try: + links = await element.locator("xpath=.//a[@href]").all() + except Exception: + links = [] + for link in links[:3]: + try: + href = await link.get_attribute("href") + except Exception: + href = None + if href: + stable_keys.add(f"href:{str(href).strip()[:240]}") + + return sorted(stable_keys) + + +async def _extract_friend_record(element): + display_name = await _extract_friend_display_name(element) + normalized_name = _normalize_target_name(display_name) + if not normalized_name: + return None + return { + "visibleName": display_name, + "normalizedName": normalized_name, + "stableKeys": await _extract_friend_stable_keys(element), + } + + +async def _first_scrollable_friends_element(page, selectors): + for selector in selectors: + try: + handle = await page.locator(selector).first.element_handle() + except Exception: + handle = None + if handle: + return selector, handle + return "", None + + +async def scroll_and_select_user(page, user, account_name, targets, friend_scan_config=None, index_targets=None): + friends_tab_selector = 'xpath=//*[@id="sub-app"]/div/div/div[1]/div[2]' + target_selectors = ( + 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]' + '//div[contains(@class, "semi-list-item-body")]', + 'xpath=//*[@id="sub-app"]//div[contains(@class, "semi-list-item-body") and .//span[contains(@class, "item-header-name-")]]', + 'xpath=//*[@id="sub-app"]//li[.//span[contains(@class, "item-header-name-")]]', + ) + scrollable_friends_selectors = ( + 'xpath=//*[@id="sub-app"]/div/div[1]/div[2]/div[2]/div/div/div[3]/div/div/div/ul/div', + 'xpath=//*[@id="sub-app"]//ul/div', + 'xpath=//*[@id="sub-app"]//div[contains(@class, "semi-list")]//ul/..', + ) + no_more_selectors = ( + 'xpath=//div[contains(@class, "no-more-tip-ftdJnu")]', + 'xpath=//*[@id="sub-app"]//*[contains(text(), "没有更多") or contains(text(), "暂无")]', + ) + loading_selectors = ( + 'xpath=//div[contains(@class, "semi-spin")]', + 'xpath=//*[@id="sub-app"]//*[contains(@class, "loading") or contains(@class, "Loading")]', + ) + friend_scan_config = friend_scan_config or _normalize_friend_list_scan_config({}) + max_scan_seconds = friend_scan_config["maxScanSeconds"] + idle_scan_seconds = friend_scan_config["idleScanSeconds"] + scroll_step_px = friend_scan_config["scrollStepPx"] + scroll_delay_seconds = friend_scan_config["scrollDelaySeconds"] + ready_timeout_seconds = 60 + empty_grace_seconds = 30 + + ready_errors = [] + active_target_selector = "" + for ready_attempt in range(2): + logger.debug("Account %s is opening the friends tab attempt=%s", account_name, ready_attempt + 1) + await _open_friends_tab(page, account_name, friends_tab_selector, target_selectors) + + try: + active_target_selector, _ = await _wait_for_friend_list_ready( + page, + account_name, + target_selectors, + no_more_selectors, + loading_selectors, + timeout_seconds=ready_timeout_seconds, + empty_grace_seconds=empty_grace_seconds, + ) + break + except FriendListUnavailableError as exc: + ready_errors.append(str(exc)) + if ready_attempt == 0: + await _reopen_friend_chat_page(page, account_name) + continue + raise FriendListUnavailableError( + f"friend_list_unavailable for {account_name}: no friend entries after " + f"two {ready_timeout_seconds}s waits; first={ready_errors[0]}; second={ready_errors[-1]}" + ) from exc + await asyncio.sleep(2) - normalized_targets = { - _normalize_target_name(target): str(target) - for target in targets - if _normalize_target_name(target) - } + normalized_targets = _build_normalized_target_map(targets) + normalized_index_targets = _build_normalized_target_map(index_targets or targets) found_usernames = set() remaining_targets = set(normalized_targets) + remaining_index_targets = set(normalized_index_targets) + friend_index = {} scan_started_at = asyncio.get_running_loop().time() last_new_friend_at = scan_started_at - max_scan_seconds = 300 - idle_scan_seconds = 120 - stuck_rounds = 0 + logger.debug( + "Account %s friend list scan config maxScanSeconds=%s idleScanSeconds=%s scrollStepPx=%s scrollDelaySeconds=%s readyTimeoutSeconds=%s emptyGraceSeconds=%s", + account_name, + max_scan_seconds, + idle_scan_seconds, + scroll_step_px, + scroll_delay_seconds, + ready_timeout_seconds, + empty_grace_seconds, + ) def missing_target_names(): return sorted(normalized_targets[item] for item in remaining_targets) - if not remaining_targets: - logger.info("Account %s has no normalized target friends to scan", account_name) - return + def missing_index_target_names(): + return sorted(normalized_index_targets[item] for item in remaining_index_targets) + + def persist_index(scan_complete): + _persist_friend_index( + user, + friend_index, + datetime.now(timezone.utc).isoformat(timespec="seconds"), + scan_complete=scan_complete, + missing_targets=missing_index_target_names(), + ) while True: now_monotonic = asyncio.get_running_loop().time() if now_monotonic - scan_started_at > max_scan_seconds: + persist_index(False) logger.warning( - "Account %s friend list scan timed out after %ss. Missing targets: %s; scannedFriends=%s", + "Account %s friend list scan timed out after %ss. Missing delivery targets: %s; missing indexed targets=%s; scannedFriends=%s", account_name, max_scan_seconds, missing_target_names(), + missing_index_target_names(), len(found_usernames), ) - return + raise FriendListIncompleteError( + f"friend_list_incomplete for {account_name}: scan timed out after {max_scan_seconds}s; " + f"missingTargets={missing_target_names()}; scannedFriends={len(found_usernames)}" + ) - name_elements = await page.locator(FRIEND_NAME_SELECTOR).all() + await _dismiss_non_login_dialogs(page, account_name, "friend_list_scan") + selector, target_locator = await _first_non_empty_locator(page, (active_target_selector,) + tuple(target_selectors)) + if not target_locator: + active_target_selector, _ = await _wait_for_friend_list_ready( + page, + account_name, + target_selectors, + no_more_selectors, + loading_selectors, + timeout_seconds=30, + empty_grace_seconds=15, + ) + continue + target_elements = await target_locator.all() + clicked_delivery_target = False - for name_locator in name_elements: - try: - target_name = (await name_locator.inner_text(timeout=3000)).strip() - except Exception: + for element in target_elements: + friend_record = await _extract_friend_record(element) + if not friend_record: continue - normalized_target_name = _normalize_target_name(target_name) + target_name = friend_record["visibleName"] + normalized_target_name = friend_record["normalizedName"] if not normalized_target_name: continue if normalized_target_name in found_usernames: continue found_usernames.add(normalized_target_name) + friend_index[normalized_target_name] = friend_record + remaining_index_targets.discard(normalized_target_name) last_new_friend_at = asyncio.get_running_loop().time() logger.debug("Account %s found friend entry %s", account_name, target_name) matched_target_name = normalized_targets.get(normalized_target_name) - if matched_target_name: - await _click_friend_entry(name_locator, account_name, target_name) + if matched_target_name and normalized_target_name in remaining_targets: + await element.click() logger.info("Account %s selected target friend %s", account_name, target_name) if matched_target_name != target_name: logger.info( @@ -282,59 +1049,75 @@ async def scroll_and_select_user(page, account_name, targets): matched_target_name, target_name, ) + scan_paused_at = asyncio.get_running_loop().time() yield matched_target_name + scan_resumed_at = asyncio.get_running_loop().time() + scan_pause_seconds = max(0, scan_resumed_at - scan_paused_at) + if scan_pause_seconds: + scan_started_at += scan_pause_seconds + last_new_friend_at += scan_pause_seconds + logger.debug( + "Account %s paused friend list scan timer for %.1fs while delivering %s", + account_name, + scan_pause_seconds, + matched_target_name, + ) remaining_targets.discard(normalized_target_name) - if not remaining_targets: - logger.info("Account %s found all target friends", account_name) + clicked_delivery_target = True + if not remaining_targets and not remaining_index_targets: + logger.info("Account %s found all delivery and indexed target friends", account_name) + persist_index(True) return break + if clicked_delivery_target: + continue + + if not remaining_targets and not remaining_index_targets: + logger.info("Account %s found all delivery and indexed target friends", account_name) + persist_index(True) + return + else: - no_more = page.locator(NO_MORE_SELECTOR).first - if await no_more.count() > 0 and await no_more.is_visible(): + if await _selector_visible(page, no_more_selectors): + persist_index(True) logger.warning( - "Account %s reached the end of the friend list. Missing targets: %s", + "Account %s reached the end of the friend list. Missing delivery targets: %s; missing indexed targets=%s", account_name, missing_target_names(), + missing_index_target_names(), ) return now_monotonic = asyncio.get_running_loop().time() if found_usernames and now_monotonic - last_new_friend_at > idle_scan_seconds: + persist_index(False) logger.warning( - "Account %s friend list scan made no progress for %ss. Missing targets: %s; scannedFriends=%s", + "Account %s friend list scan made no progress for %ss. Missing delivery targets: %s; missing indexed targets=%s; scannedFriends=%s", account_name, idle_scan_seconds, missing_target_names(), + missing_index_target_names(), len(found_usernames), ) - return + raise FriendListIncompleteError( + f"friend_list_incomplete for {account_name}: no new friends for {idle_scan_seconds}s; " + f"missingTargets={missing_target_names()}; scannedFriends={len(found_usernames)}" + ) - loading = page.locator(LOADING_SELECTOR).first - if await loading.count() > 0 and await loading.is_visible(): + if await _selector_visible(page, loading_selectors): logger.debug("Account %s is waiting for more friends to load", account_name) - await asyncio.sleep(1.5) + await asyncio.sleep(scroll_delay_seconds) - scrollable_element = await _find_scrollable_friends_element(page) + scrollable_selector, scrollable_element = await _first_scrollable_friends_element(page, scrollable_friends_selectors) if not scrollable_element: - raise RuntimeError(f"Account {account_name} could not find the friend list scroll container") + raise FriendListUnavailableError( + f"friend_list_unavailable for {account_name}: could not find the friend list scroll container" + ) - before_top = await page.evaluate("(element) => element.scrollTop", scrollable_element) - await page.evaluate("(element) => element.scrollTop += 800", scrollable_element) - await asyncio.sleep(1.5) - after_top = await page.evaluate("(element) => element.scrollTop", scrollable_element) - if after_top > before_top: - stuck_rounds = 0 - else: - stuck_rounds += 1 - if stuck_rounds >= 3: - logger.warning( - "Account %s friend list stopped scrolling. Missing targets: %s; scannedFriends=%s", - account_name, - missing_target_names(), - len(found_usernames), - ) - return + logger.debug("Account %s scrolls friend list via selector %s", account_name, scrollable_selector) + await page.evaluate(f"(element) => element.scrollTop += {scroll_step_px}", scrollable_element) + await asyncio.sleep(scroll_delay_seconds) def _is_manual_run(): @@ -426,6 +1209,75 @@ def _target_failed_today(user, target_name, now): return bool(last_attempt_at and last_attempt_at.date() == now.date()) +def _account_failure_entry_today(user, now): + entry = dict(user.get("account_failure") or {}) + last_attempt_at = _parse_sent_at(entry.get("lastAttemptAt"), now.tzinfo) + if last_attempt_at and last_attempt_at.date() == now.date(): + return entry + return {} + + +def _account_failure_attempts_today(user, now): + return _coerce_attempt_count(_account_failure_entry_today(user, now)) + + +def _account_failure_pause_after_attempts(): + raw_value = str(os.getenv("SPARKFLOW_ACCOUNT_FAILURE_PAUSE_AFTER_ATTEMPTS") or "2").strip() + try: + return max(1, int(raw_value)) + except ValueError: + logger.warning("Invalid SPARKFLOW_ACCOUNT_FAILURE_PAUSE_AFTER_ATTEMPTS=%r, using 2", raw_value) + return 2 + + +def _temporary_account_failure_cooldown_minutes(): + raw_value = str(os.getenv("SPARKFLOW_TEMP_ACCOUNT_FAILURE_COOLDOWN_MINUTES") or "60").strip() + try: + return max(1, int(raw_value)) + except ValueError: + logger.warning("Invalid SPARKFLOW_TEMP_ACCOUNT_FAILURE_COOLDOWN_MINUTES=%r, using 60", raw_value) + return 60 + + +def _account_paused_by_failure_today(user, now): + entry = _account_failure_entry_today(user, now) + if _coerce_attempt_count(entry) < _account_failure_pause_after_attempts(): + return False + + category = str(entry.get("category") or "") + if category not in TEMPORARY_ACCOUNT_FAILURE_CATEGORIES: + return True + + last_attempt_at = _parse_sent_at(entry.get("lastAttemptAt"), now.tzinfo) + if not last_attempt_at: + return True + + cooldown_seconds = _temporary_account_failure_cooldown_minutes() * 60 + elapsed_seconds = (now - last_attempt_at).total_seconds() + if elapsed_seconds < cooldown_seconds: + remaining_seconds = int(cooldown_seconds - elapsed_seconds) + logger.info( + "Account %s temporary account failure category=%s cooling down for %ss", + user.get("username") or user.get("unique_id") or "unknown", + category, + remaining_seconds, + ) + return True + + logger.info( + "Account %s temporary account failure category=%s cooldown expired; allowing retry", + user.get("username") or user.get("unique_id") or "unknown", + category, + ) + return False + + +def _friend_index_complete_today(user, now): + meta = dict(user.get("friend_index_meta") or {}) + last_scan_at = _parse_sent_at(meta.get("lastScanAt"), now.tzinfo) + return bool(last_scan_at and last_scan_at.date() == now.date() and meta.get("lastScanComplete")) + + def _target_failure_attempts_today(user, target_name, now): queue = dict(user.get("failure_queue") or {}) entry = queue.get(target_name) or {} @@ -441,6 +1293,15 @@ def _target_failure_attempts_today(user, target_name, now): def _pending_failed_targets(user, now): queue = dict(user.get("failure_queue") or {}) targets = [] + account_failure = _account_failure_entry_today(user, now) + account_failure_targets = list(account_failure.get("affectedTargets") or []) + if account_failure_targets: + for target_name in account_failure_targets: + if target_name in (user.get("targets") or []) and not _target_sent_today(user, target_name, now): + targets.append(target_name) + if targets: + return targets + for target_name in user.get("targets") or []: if _target_sent_today(user, target_name, now): continue @@ -514,6 +1375,7 @@ def _select_due_targets(user, send_window, now): already_sent = [] pending_targets = [] queued_failures = [] + account_paused = _account_paused_by_failure_today(user, now) for target_name in targets: if _target_sent_today(user, target_name, now): already_sent.append(target_name) @@ -522,10 +1384,19 @@ def _select_due_targets(user, send_window, now): queued_failures.append(target_name) continue scheduled_at = _scheduled_send_time(user, target_name, send_window, now) - if now >= scheduled_at: + if account_paused: + pending_targets.append((target_name, scheduled_at)) + elif now >= scheduled_at: due_targets.append(target_name) else: pending_targets.append((target_name, scheduled_at)) + if account_paused and (pending_targets or queued_failures): + logger.info( + "Account %s is paused by account-level failure attempts=%s threshold=%s", + user.get("username") or user.get("unique_id") or "unknown", + _account_failure_attempts_today(user, now), + _account_failure_pause_after_attempts(), + ) return due_targets, already_sent, pending_targets, queued_failures @@ -648,6 +1519,121 @@ def _find_matching_account(accounts, user): return None +def _coerce_attempt_count(entry): + try: + return int(dict(entry or {}).get("attemptCount") or 0) + except (TypeError, ValueError): + return 0 + + +def _persist_account_send_failure(user, category, reason, attempted_at, affected_targets=None): + accounts = get_userData(force_reload=True) + matched_account = _find_matching_account(accounts, user) + if matched_account is None: + logger.warning( + "Could not find account to persist account-level browser failure for user=%s", + user.get("username", "unknown"), + ) + return + + affected_targets = list(affected_targets or []) + existing_entry = dict(matched_account.get("account_failure") or {}) + entry = { + "category": category, + "reason": reason, + "firstAttemptAt": existing_entry.get("firstAttemptAt") or attempted_at, + "lastAttemptAt": attempted_at, + "attemptCount": _coerce_attempt_count(existing_entry) + 1, + "lastRunMode": _current_run_mode(), + "affectedTargets": affected_targets, + } + failure_queue = dict(matched_account.get("failure_queue") or {}) + for target_name in affected_targets: + failure_queue.pop(target_name, None) + if failure_queue: + matched_account["failure_queue"] = failure_queue + else: + matched_account.pop("failure_queue", None) + matched_account["account_failure"] = entry + save_userData(accounts) + + user["account_failure"] = dict(entry) + user_queue = dict(user.get("failure_queue") or {}) + for target_name in affected_targets: + user_queue.pop(target_name, None) + if user_queue: + user["failure_queue"] = user_queue + else: + user.pop("failure_queue", None) + logger.warning( + "Paused browser sends for account %s category=%s affectedTargets=%s reason=%s", + matched_account.get("username", "unknown"), + category, + affected_targets, + reason, + ) + + +def _clear_account_send_failure(user): + accounts = get_userData(force_reload=True) + matched_account = _find_matching_account(accounts, user) + changed = False + if matched_account is not None and matched_account.pop("account_failure", None) is not None: + changed = True + if changed: + save_userData(accounts) + user.pop("account_failure", None) + + +def _persist_friend_index(user, friend_records, scanned_at, *, scan_complete, missing_targets=None): + accounts = get_userData(force_reload=True) + matched_account = _find_matching_account(accounts, user) + if matched_account is None: + logger.warning( + "Could not find account to persist friend index for user=%s", + user.get("username", "unknown"), + ) + return + + existing_index = dict(matched_account.get("friend_index") or {}) + for normalized_name, record in (friend_records or {}).items(): + entry = dict(existing_index.get(normalized_name) or {}) + entry.update( + { + "visibleName": record.get("visibleName") or "", + "normalizedName": normalized_name, + "stableKeys": list(record.get("stableKeys") or []), + "lastSeenAt": scanned_at, + } + ) + existing_index[normalized_name] = entry + + meta = { + "lastScanAt": scanned_at, + "lastScanComplete": bool(scan_complete), + "scannedCount": len(friend_records or {}), + "missingTargets": list(missing_targets or []), + } + matched_account["friend_index"] = existing_index + matched_account["friend_index_meta"] = meta + if scan_complete and friend_records: + matched_account.pop("account_failure", None) + save_userData(accounts) + + user["friend_index"] = dict(existing_index) + user["friend_index_meta"] = dict(meta) + if scan_complete and friend_records: + user.pop("account_failure", None) + + logger.info( + "Persisted friend index for %s scanned=%s complete=%s missingTargets=%s", + matched_account.get("username", "unknown"), + len(friend_records or {}), + bool(scan_complete), + list(missing_targets or []), + ) + + def _persist_browser_send_failure(user, target_name, message, category, reason, attempted_at): accounts = get_userData(force_reload=True) matched_account = _find_matching_account(accounts, user) @@ -667,7 +1653,7 @@ def _persist_browser_send_failure(user, target_name, message, category, reason, "message": message, "firstAttemptAt": existing_entry.get("firstAttemptAt") or attempted_at, "lastAttemptAt": attempted_at, - "attemptCount": int(existing_entry.get("attemptCount") or 0) + 1, + "attemptCount": _coerce_attempt_count(existing_entry) + 1, "lastRunMode": _current_run_mode(), } matched_account["failure_queue"] = queue @@ -709,6 +1695,7 @@ def _persist_browser_send_success(user, target_name, message, sent_at): matched_account["failure_queue"] = queue else: matched_account.pop("failure_queue", None) + matched_account.pop("account_failure", None) save_userData(accounts) user_history = dict(user.get("message_history") or {}) @@ -723,6 +1710,7 @@ def _persist_browser_send_success(user, target_name, message, sent_at): user["failure_queue"] = user_queue else: user.pop("failure_queue", None) + user.pop("account_failure", None) logger.info( "Persisted browser send history for %s/%s at %s", @@ -754,17 +1742,130 @@ def _split_sender_modes(active_config, runnable_user_data): return protocol_users, browser_users +def _pid_is_alive(pid): + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + return True + return True + + +def _extract_lock_pid(raw): + for line in str(raw or "").splitlines(): + if line.startswith("pid="): + try: + return int(line.split("=", 1)[1].strip()) + except (TypeError, ValueError): + return None + try: + return int(str(raw or "").strip()) + except (TypeError, ValueError): + return None + + +def _browser_account_lock_is_stale(lock_path, raw): + pid = _extract_lock_pid(raw) + if pid is not None and not _pid_is_alive(pid): + return True, f"missing pid={pid}" + try: + age_seconds = datetime.now(timezone.utc).timestamp() - lock_path.stat().st_mtime + except OSError: + return True, "missing lock file" + if age_seconds > 7200: + return True, f"older than 7200s pid={pid}" + return False, "" + + +async def _acquire_browser_account_lock(user, account_name): + lock_dir = Path("logs/browser-account-locks") + lock_dir.mkdir(parents=True, exist_ok=True) + identity = _account_identity(user) or account_name + lock_path = lock_dir / f"{_safe_name(identity)}.lock" + started_at = asyncio.get_running_loop().time() + last_logged_at = 0 + + while True: + try: + handle = lock_path.open("x", encoding="utf-8") + handle.write( + f"pid={os.getpid()}\n" + f"account={account_name}\n" + f"createdAt={datetime.now(timezone.utc).isoformat(timespec='seconds')}\n" + ) + handle.flush() + logger.debug("Acquired browser account lock for %s at %s", account_name, lock_path) + return handle, lock_path + except FileExistsError: + raw = lock_path.read_text(encoding="utf-8", errors="ignore") + is_stale, stale_reason = _browser_account_lock_is_stale(lock_path, raw) + if is_stale: + logger.warning( + "Removing stale browser account lock for %s at %s: %s", + account_name, + lock_path, + stale_reason, + ) + try: + lock_path.unlink() + except FileNotFoundError: + pass + continue + + now = asyncio.get_running_loop().time() + if now - started_at > 7200: + raise RuntimeError(f"timed out waiting for browser account lock for {account_name}") + if now - last_logged_at >= 30: + logger.info( + "Waiting for existing browser account lock for %s at %s", + account_name, + lock_path, + ) + last_logged_at = now + await asyncio.sleep(5) + + +def _release_browser_account_lock(handle, lock_path, account_name): + try: + handle.close() + finally: + try: + lock_path.unlink() + logger.debug("Released browser account lock for %s at %s", account_name, lock_path) + except FileNotFoundError: + pass + + async def run_browser_tasks(active_config, browser_user_data): if not browser_user_data: return + send_strategy = _normalize_send_strategy(active_config) + friend_scan_config = _normalize_friend_list_scan_config(active_config) + profile_config = _normalize_persistent_profile_config(active_config) + semaphore = asyncio.Semaphore(active_config["taskCount"] if active_config["multiTask"] else 1) + tasks = [] + + if profile_config["enabled"]: + logger.info( + "Browser sender persistent profiles enabled root=%s seedCookiesWhenEmpty=%s syncStoredCookiesBeforeRun=%s refreshStoredCookiesAfterLogin=%s", + profile_config["root"], + profile_config["seedCookiesWhenEmpty"], + profile_config["syncStoredCookiesBeforeRun"], + profile_config["refreshStoredCookiesAfterLogin"], + ) + for user in browser_user_data: + logger.info("Using persistent browser sender for user=%s targets=%s", user.get("username", "unknown"), user["targets"]) + tasks.append(do_user_task(None, user, semaphore, send_strategy, profile_config, friend_scan_config)) + await asyncio.gather(*tasks) + return + playwright, browser = await get_browser() try: - semaphore = asyncio.Semaphore(active_config["taskCount"] if active_config["multiTask"] else 1) - tasks = [] for user in browser_user_data: logger.info("Using browser sender for user=%s targets=%s", user.get("username", "unknown"), user["targets"]) - tasks.append(do_user_task(browser, user, semaphore)) + tasks.append(do_user_task(browser, user, semaphore, send_strategy, profile_config, friend_scan_config)) await asyncio.gather(*tasks) finally: @@ -772,135 +1873,272 @@ async def run_browser_tasks(active_config, browser_user_data): await browser.close() -async def do_user_task(browser, user, semaphore): +async def do_user_task(browser, user, semaphore, send_strategy, profile_config, friend_scan_config): async with semaphore: account_name = user.get("username", "unknown") - cookies = user["cookies"] - targets = user["targets"] - context = await browser.new_context() - context.set_default_navigation_timeout(120000) - context.set_default_timeout(120000) - yielded_targets = set() - + account_lock_handle = None + account_lock_path = None try: - page = await context.new_page() - try: - await retry_operation( - "open creator home", - page.goto, - retries=3, - delay=5, - url="https://creator.douyin.com/", - ) + account_lock_handle, account_lock_path = await _acquire_browser_account_lock(user, account_name) + await _do_user_task_locked( + browser, + user, + send_strategy, + profile_config, + friend_scan_config, + account_name, + ) + finally: + if account_lock_handle is not None: + _release_browser_account_lock(account_lock_handle, account_lock_path, account_name) + + +async def _do_user_task_locked(browser, user, send_strategy, profile_config, friend_scan_config, account_name): + cookies = user["cookies"] + targets = user["targets"] + start_delay = _random_delay_seconds( + send_strategy, + "accountStartDelaySecondsMin", + "accountStartDelaySecondsMax", + ) + await _sleep_with_log(start_delay, "Delaying browser sender start", account_name) + + owned_playwright = None + profile_dir = None + if profile_config["enabled"]: + owned_playwright, context, profile_dir = await get_persistent_browser_context( + _account_profile_name(user), + root=profile_config["root"], + ) + logger.info("Opened persistent browser profile for %s at %s", account_name, profile_dir) + if profile_config["syncStoredCookiesBeforeRun"]: + await apply_stored_cookies_to_profile(context, cookies, account_name) + elif profile_config["seedCookiesWhenEmpty"]: + await apply_stored_cookies_to_profile(context, cookies, account_name, only_when_empty=True) + else: + context = await browser.new_context() + + context.set_default_navigation_timeout(120000) + context.set_default_timeout(120000) + yielded_targets = set() + page = None + + try: + page = await context.new_page() + try: + await retry_operation( + "open creator home", + page.goto, + retries=3, + delay=5, + url=CREATOR_HOME_URL, + ) + if not profile_config["enabled"]: await context.add_cookies(cookies) - await retry_operation( - "open chat page", - page.goto, - retries=3, - delay=5, - url="https://creator.douyin.com/creator-micro/data/following/chat", - ) - except Exception as exc: - attempted_at = datetime.now(timezone.utc).isoformat(timespec="seconds") - category = classify_browser_failure("open_chat_page", exc) - reason = str(exc) - logger.exception("Account %s failed before target delivery", account_name) + await retry_operation( + "open chat page", + page.goto, + retries=3, + delay=5, + url=CREATOR_CHAT_URL, + ) + await asyncio.sleep(3) + await ensure_not_login_required(page, account_name, "open_chat_page") + if profile_config["enabled"] and profile_config["refreshStoredCookiesAfterLogin"]: + await refresh_stored_cookies_from_profile(context, user, account_name) + except Exception as exc: + attempted_at = datetime.now(timezone.utc).isoformat(timespec="seconds") + category = classify_browser_failure("open_chat_page", exc) + reason = str(exc) + logger.exception("Account %s failed before target delivery", account_name) + if _is_account_level_failure_category(category): + _persist_account_send_failure(user, category, reason, attempted_at, targets) + else: for target_name in targets: _persist_browser_send_failure(user, target_name, "", category, reason, attempted_at) - return + return - logger.info("Account %s started the message flow", account_name) - try: - async for target_name in scroll_and_select_user(page, account_name, targets): - yielded_targets.add(target_name) - message = "" - chat_input = None - try: - await save_debug_artifacts(page, account_name, target_name, "selected-friend") - chat_input, selector_used = await locate_chat_input(page) - logger.info("Using chat input selector %s for %s/%s", selector_used, account_name, target_name) + logger.info("Account %s started the message flow", account_name) + try: + index_targets = targets + schedule_now = datetime.now(_schedule_timezone()) + if not _friend_index_complete_today(user, schedule_now): + index_targets = list(user.get("targets") or targets) + logger.info( + "Account %s will refresh today's friend index while delivering targets=%s", + account_name, + targets, + ) + async for target_name in scroll_and_select_user( + page, + user, + account_name, + targets, + friend_scan_config, + index_targets=index_targets, + ): + yielded_targets.add(target_name) + message = "" + chat_input = None + visible_message_count_before = None + try: + await save_debug_artifacts(page, account_name, target_name, "selected-friend") + chat_input, selector_used = await locate_chat_input(page) + logger.info("Using chat input selector %s for %s/%s", selector_used, account_name, target_name) - message = build_message() - logger.info("Prepared message for %s/%s: %r", account_name, target_name, message) + message = build_message() + logger.info("Prepared message for %s/%s: %r", account_name, target_name, message) + visible_message_count_before = await count_visible_message_matches( + page, + message, + chat_input=chat_input, + ) + logger.info( + "Visible message count before send for %s/%s: %s", + account_name, + target_name, + visible_message_count_before, + ) - lines = message.split("\n") - for index, line in enumerate(lines): - await chat_input.type(line, delay=50) - if index < len(lines) - 1: - await chat_input.press("Shift+Enter") + lines = message.split("\n") + for index, line in enumerate(lines): + await chat_input.type(line, delay=50) + if index < len(lines) - 1: + await chat_input.press("Shift+Enter") - await save_debug_artifacts(page, account_name, target_name, "typed-message") + await save_debug_artifacts(page, account_name, target_name, "typed-message") - logger.info("Pressing Enter to send message for %s/%s", account_name, target_name) - await chat_input.press("Enter") + logger.info("Pressing Enter to send message for %s/%s", account_name, target_name) + await chat_input.press("Enter") - sent_ok, detail = await confirm_message_sent(page, chat_input, message) - await save_debug_artifacts(page, account_name, target_name, "after-send") + sent_ok, detail = await confirm_message_sent( + page, + chat_input, + message, + before_visible_count=visible_message_count_before, + ) + await save_debug_artifacts(page, account_name, target_name, "after-send") - if not sent_ok: - raise RuntimeError(detail) + if not sent_ok: + raise RuntimeError(detail) - logger.info("Message send confirmed for %s/%s: %s", account_name, target_name, detail) + logger.info("Message send confirmed for %s/%s: %s", account_name, target_name, detail) + _persist_browser_send_success( + user, + target_name, + message, + datetime.now(timezone.utc).isoformat(timespec="seconds"), + ) + interval = _random_delay_seconds( + send_strategy, + "messageIntervalSecondsMin", + "messageIntervalSecondsMax", + ) + await _sleep_with_log(interval, "Delaying next browser message", account_name) + except Exception as exc: + sent_ok = False + detail = "" + if message: + sent_ok, detail = await detect_message_already_sent( + page, + chat_input, + message, + before_visible_count=visible_message_count_before, + ) + if sent_ok: + logger.warning( + "Recovered send outcome for %s/%s after failure: %s", + account_name, + target_name, + detail, + ) _persist_browser_send_success( user, target_name, message, datetime.now(timezone.utc).isoformat(timespec="seconds"), ) - except Exception as exc: - sent_ok = False - detail = "" - if message: - sent_ok, detail = await detect_message_already_sent(page, chat_input, message) - if sent_ok: - logger.warning( - "Recovered send outcome for %s/%s after failure: %s", - account_name, - target_name, - detail, - ) - _persist_browser_send_success( - user, - target_name, - message, - datetime.now(timezone.utc).isoformat(timespec="seconds"), - ) - continue + continue - logger.exception("Send flow failed for %s/%s", account_name, target_name) - await save_debug_artifacts(page, account_name, target_name, "send-error") - _persist_browser_send_failure( - user, - target_name, - message, - classify_browser_failure("send_flow", exc), - str(exc), - datetime.now(timezone.utc).isoformat(timespec="seconds"), + logger.exception("Send flow failed for %s/%s", account_name, target_name) + await save_debug_artifacts(page, account_name, target_name, "send-error") + category = classify_browser_failure("send_flow", exc) + reason = str(exc) + is_login_required, login_detail = await _page_has_login_required_prompt(page) + if is_login_required: + category = "login_required" + reason = f"{reason}\n{login_detail}" + logger.warning( + "Account %s hit login_required during send flow: %s", + account_name, + login_detail, ) - except Exception as exc: - remaining_targets = [target for target in targets if target not in yielded_targets] - attempted_at = datetime.now(timezone.utc).isoformat(timespec="seconds") - category = classify_browser_failure("friend_list", exc) - reason = str(exc) - logger.exception("Target selection failed for %s", account_name) - for target_name in remaining_targets: - _persist_browser_send_failure(user, target_name, "", category, reason, attempted_at) - return - - missing_targets = [target for target in targets if target not in yielded_targets] - if missing_targets: - attempted_at = datetime.now(timezone.utc).isoformat(timespec="seconds") - for target_name in missing_targets: + if _is_stop_account_category(category): + affected_targets = [ + target_name, + *[target for target in targets if target not in yielded_targets], + ] + attempted_at = datetime.now(timezone.utc).isoformat(timespec="seconds") + _persist_account_send_failure( + user, + category, + reason, + attempted_at, + affected_targets, + ) + return _persist_browser_send_failure( user, target_name, - "", - "friend_not_found", - "target not selected from friend list", - attempted_at, + message, + category, + reason, + datetime.now(timezone.utc).isoformat(timespec="seconds"), ) - finally: - await context.close() + interval = _random_delay_seconds( + send_strategy, + "messageIntervalSecondsMin", + "messageIntervalSecondsMax", + ) + await _sleep_with_log(interval, "Delaying next browser message after failure", account_name) + except Exception as exc: + remaining_targets = [target for target in targets if target not in yielded_targets] + attempted_at = datetime.now(timezone.utc).isoformat(timespec="seconds") + category = classify_browser_failure("friend_list", exc) + reason = str(exc) + is_login_required, login_detail = await _page_has_login_required_prompt(page) + if is_login_required: + category = "login_required" + reason = f"{reason}\n{login_detail}" + logger.exception("Target selection failed for %s", account_name) + if _is_account_level_failure_category(category): + _persist_account_send_failure(user, category, reason, attempted_at, remaining_targets) + else: + for target_name in remaining_targets: + _persist_browser_send_failure(user, target_name, "", category, reason, attempted_at) + return + + missing_targets = [target for target in targets if target not in yielded_targets] + if missing_targets: + attempted_at = datetime.now(timezone.utc).isoformat(timespec="seconds") + for target_name in missing_targets: + _persist_browser_send_failure( + user, + target_name, + "", + "friend_not_found", + "target not selected from friend list", + attempted_at, + ) + finally: + if page is not None: + try: + await page.close() + except Exception: + pass + await context.close() + if owned_playwright is not None: + await owned_playwright.stop() async def runTasks(): diff --git a/DouYinSparkFlow/scripts/start_login_desktop.sh b/DouYinSparkFlow/scripts/start_login_desktop.sh index 8af3d10..a98222f 100644 --- a/DouYinSparkFlow/scripts/start_login_desktop.sh +++ b/DouYinSparkFlow/scripts/start_login_desktop.sh @@ -16,6 +16,22 @@ pkill -f "websockify --web=/usr/share/novnc ${LOGIN_DESKTOP_WEB_PORT}" >/dev/nul rm -f "/tmp/.X99-lock" rm -f "/tmp/.X11-unix/X99" +run_forever() { + local name="$1" + shift + local log_file="/app/logs/login_desktop/${name}.log" + : > "${log_file}" + while true; do + printf '[%s] starting %s\n' "$(date -Is)" "${name}" >> "${log_file}" + set +e + "$@" >> "${log_file}" 2>&1 + local status=$? + set -e + printf '[%s] %s exited with status %s; restarting in 2s\n' "$(date -Is)" "${name}" "${status}" >> "${log_file}" + sleep 2 + done +} + Xvfb "${DISPLAY}" -screen 0 1600x1000x24 -ac +extension RANDR > /app/logs/login_desktop/xvfb.log 2>&1 & for _ in $(seq 1 30); do if [ -S /tmp/.X11-unix/X99 ]; then @@ -25,7 +41,7 @@ for _ in $(seq 1 30); do done fluxbox > /app/logs/login_desktop/fluxbox.log 2>&1 & -x11vnc -display "${DISPLAY}" -forever -shared -rfbport "${LOGIN_DESKTOP_VNC_PORT}" -nopw > /app/logs/login_desktop/x11vnc.log 2>&1 & -websockify --web=/usr/share/novnc "${LOGIN_DESKTOP_WEB_PORT}" "localhost:${LOGIN_DESKTOP_VNC_PORT}" > /app/logs/login_desktop/novnc.log 2>&1 & +run_forever x11vnc x11vnc -display "${DISPLAY}" -forever -shared -rfbport "${LOGIN_DESKTOP_VNC_PORT}" -localhost -nopw & +run_forever novnc websockify --web=/usr/share/novnc "${LOGIN_DESKTOP_WEB_PORT}" "127.0.0.1:${LOGIN_DESKTOP_VNC_PORT}" & exec python /app/login_desktop_server.py diff --git a/DouYinSparkFlow/utils/config.py b/DouYinSparkFlow/utils/config.py index fafc15a..f1de884 100644 --- a/DouYinSparkFlow/utils/config.py +++ b/DouYinSparkFlow/utils/config.py @@ -26,6 +26,13 @@ DEFAULT_CONFIG = { "useProtocolSender": True, "protocolDryRun": False, "browserSenderAccounts": [], + "persistentBrowserProfiles": { + "enabled": False, + "root": "/opt/douyin-sparkflow/state/browser-profiles", + "seedCookiesWhenEmpty": True, + "syncStoredCookiesBeforeRun": True, + "refreshStoredCookiesAfterLogin": True, + }, "sendStrategy": { "shuffleTargets": True, "accountStartDelaySecondsMin": 0, @@ -56,10 +63,13 @@ DEFAULT_APP_SETTINGS = { "ui_host": "0.0.0.0", "ui_port": 8787, "login_poll_interval_seconds": 1, - "ops_log_file": "/app/logs/douyin-sparkflow.log", + "ops_log_file": "/var/log/douyin-sparkflow.log", "proxy_refresh_script": "/opt/douyin-sparkflow/refresh_proxy.sh", "local_login_helper_url": "http://127.0.0.1:18765", "login_desktop_api_url": "http://127.0.0.1:18090", + "login_desktop_public_url": "", + "login_desktop_public_scheme": "http", + "login_desktop_public_port": 8788, "server_host": "", "server_username": "", "server_password": "", diff --git a/DouYinSparkFlow/webui/app.py b/DouYinSparkFlow/webui/app.py index 036ae5c..10330ca 100644 --- a/DouYinSparkFlow/webui/app.py +++ b/DouYinSparkFlow/webui/app.py @@ -1,6 +1,5 @@ import json import logging -import os import traceback from datetime import datetime, timedelta, timezone from pathlib import Path @@ -17,7 +16,7 @@ from starlette.middleware.sessions import SessionMiddleware logger = logging.getLogger(__name__) from core.friends import fetch_account_friends -from core.tasks import run_browser_tasks +from core.tasks import run_browser_tasks, task_run_lock from utils.config import ( get_app_settings, get_config, @@ -41,12 +40,15 @@ from webui.auth import ( verify_password, ) from webui.ops import ( + TASK_ALREADY_RUNNING, get_ops_snapshot, read_log_tail, refresh_proxy, restart_proxy, + run_failed_retry_now, run_task_now, run_unsent_retry_now, + task_run_lock_status, update_daily_schedule, ) @@ -133,13 +135,18 @@ def _target_sent_today(account, target_name): def login_desktop_api_url(): settings = get_app_settings(force_reload=True) - return str(os.getenv("SPARKFLOW_LOGIN_DESKTOP_API_URL") or settings.get("login_desktop_api_url") or "http://127.0.0.1:18090").rstrip("/") + return str(settings.get("login_desktop_api_url") or "http://127.0.0.1:18090").rstrip("/") def login_desktop_public_url(request: Request) -> str: + settings = get_app_settings(force_reload=True) + configured_url = str(settings.get("login_desktop_public_url") or "").strip() + if configured_url: + return configured_url + host = request.url.hostname or "127.0.0.1" - scheme = request.url.scheme or "http" - port = str(os.getenv("LOGIN_DESKTOP_PUBLIC_PORT") or "8788").strip() or "8788" + scheme = str(settings.get("login_desktop_public_scheme") or "http").strip() or "http" + port = coerce_int(settings.get("login_desktop_public_port"), 8788, minimum=1) return f"{scheme}://{host}:{port}/vnc.html?autoconnect=1&resize=scale&view_only=0" @@ -237,14 +244,6 @@ def create_app(): return redirect("/login") return None - def console_context(request): - return { - "flash": pop_flash(request), - "accounts": get_userData(force_reload=True), - "runtime_config": get_config(force_reload=True), - "ops": get_ops_snapshot(), - } - def flash(request, message, level="info"): request.session["flash"] = {"message": message, "level": level} @@ -267,7 +266,7 @@ def create_app(): @app.post("/bootstrap") async def bootstrap(request: Request): if is_bootstrapped(): - flash(request, "管理员账号已初始化,请直接登录。", "warning") + flash(request, "Admin login is already configured.", "warning") return redirect("/login") form = await request.form() @@ -275,17 +274,17 @@ def create_app(): password = str(form.get("password", "")) confirm = str(form.get("confirm_password", "")) if not password or password != confirm: - flash(request, "初始化失败,请输入一致的管理员密码。", "error") + flash(request, "Password setup failed. Please enter matching passwords.", "error") return redirect("/login") bootstrap_admin_password(password, username=username) - flash(request, "管理员账号已创建,请登录控制台。", "success") + flash(request, "Admin credentials created. Please log in.", "success") return redirect("/login") @app.post("/login") async def login_action(request: Request): if not is_bootstrapped(): - flash(request, "请先创建管理员密码。", "warning") + flash(request, "Create the admin password first.", "warning") return redirect("/login") form = await request.form() @@ -293,11 +292,11 @@ def create_app(): password = str(form.get("password", "")) settings = get_app_settings(force_reload=True) if username != settings["admin_username"] or not verify_password(password, settings["admin_password_hash"]): - flash(request, "用户名或密码不正确。", "error") + flash(request, "Invalid username or password.", "error") return redirect("/login") issue_session(request, username) - flash(request, "已登录控制台。", "success") + flash(request, "Signed in successfully.", "success") return redirect("/") @app.post("/logout") @@ -314,43 +313,12 @@ def create_app(): return render_template( request, "dashboard.html", - console_context(request), - ) - - @app.get("/login-workspace", response_class=HTMLResponse) - async def login_workspace_page(request: Request): - maybe_redirect = require_user(request) - if maybe_redirect: - return maybe_redirect - - return render_template( - request, - "login_workspace.html", - console_context(request), - ) - - @app.get("/accounts", response_class=HTMLResponse) - async def accounts_page(request: Request): - maybe_redirect = require_user(request) - if maybe_redirect: - return maybe_redirect - - return render_template( - request, - "accounts.html", - console_context(request), - ) - - @app.get("/settings", response_class=HTMLResponse) - async def settings_page(request: Request): - maybe_redirect = require_user(request) - if maybe_redirect: - return maybe_redirect - - return render_template( - request, - "settings.html", - console_context(request), + { + "flash": pop_flash(request), + "accounts": get_userData(force_reload=True), + "runtime_config": get_config(force_reload=True), + "ops": get_ops_snapshot(), + }, ) @app.get("/ops/send-console", response_class=HTMLResponse) @@ -388,11 +356,11 @@ def create_app(): account["targets"] = targets account["enabled"] = str(form.get("enabled", "")) == "on" save_userData(accounts) - flash(request, f"已更新账号 {account['username']}。", "success") + flash(request, f"Updated account {account['username']}.", "success") else: - flash(request, "未找到账号。", "error") + flash(request, "Account not found.", "error") - return redirect("/accounts") + return redirect("/") @app.post("/accounts/{unique_id}/toggle-enabled") async def toggle_account_enabled(request: Request, unique_id: str): @@ -407,8 +375,8 @@ def create_app(): accounts = get_userData(force_reload=True) account = find_account(accounts, unique_id) if not account: - flash(request, "未找到账号。", "error") - return redirect("/accounts") + flash(request, "Account not found.", "error") + return redirect("/") account["enabled"] = not is_account_enabled(account) save_userData(accounts) @@ -417,7 +385,7 @@ def create_app(): f"{account.get('username', 'Account')} 已{'启用' if account['enabled'] else '停用'}自动续火花。", "success", ) - return redirect("/accounts") + return redirect("/") @app.post("/accounts/{unique_id}/friends/refresh") async def refresh_account_friend_list(request: Request, unique_id: str): @@ -463,10 +431,10 @@ def create_app(): updated_accounts = [item for item in accounts if normalize_unique_id(item.get("unique_id")) != normalize_unique_id(unique_id)] if len(updated_accounts) != len(accounts): save_userData(updated_accounts) - flash(request, "账号已删除。", "success") + flash(request, "Account deleted.", "success") else: - flash(request, "未找到账号。", "error") - return redirect("/accounts") + flash(request, "Account not found.", "error") + return redirect("/") @app.post("/accounts/{unique_id}/retry-target") async def retry_account_target(request: Request, unique_id: str): @@ -480,13 +448,18 @@ def create_app(): target_name = str(form.get("target", "")).strip() if not target_name: - flash(request, "请选择需要重试的目标。", "error") + flash(request, "Target is required for retry.", "error") return redirect("/ops/send-console") accounts = get_userData(force_reload=True) account = find_account(accounts, unique_id) if not account: - flash(request, "未找到账号。", "error") + flash(request, "Account not found.", "error") + return redirect("/ops/send-console") + + lock_status = task_run_lock_status() + if lock_status.get("running"): + flash(request, "已有发送任务正在运行,本次单目标重试没有启动。请等当前任务结束后再试。", "warning") return redirect("/ops/send-console") account_copy = dict(account) @@ -495,18 +468,24 @@ def create_app(): config["taskCount"] = 1 try: - await run_browser_tasks(config, [account_copy]) + with task_run_lock(): + await run_browser_tasks(config, [account_copy]) except Exception as exc: - flash(request, f"{account.get('username', '账号')} / {target_name} 重试失败:{exc}", "error") + flash(request, f"Retry failed for {account.get('username', 'Account')} / {target_name}: {exc}", "error") return redirect("/ops/send-console") updated_account = find_account(get_userData(force_reload=True), unique_id) or {} if _target_sent_today(updated_account, target_name): - flash(request, f"{account.get('username', '账号')} / {target_name} 已重试成功。", "success") + flash(request, f"Retried {account.get('username', 'Account')} / {target_name} successfully.", "success") else: + account_failure = dict(updated_account.get("account_failure") or {}) + affected_targets = list(account_failure.get("affectedTargets") or []) failure_entry = dict(updated_account.get("failure_queue") or {}).get(target_name) or {} - reason = str(failure_entry.get("reason") or "重试未确认发送成功。") - flash(request, f"{account.get('username', '账号')} / {target_name} 重试未成功:{reason}", "error") + if target_name in affected_targets: + reason = str(account_failure.get("reason") or "Account-level browser failure.") + else: + reason = str(failure_entry.get("reason") or "Retry did not confirm a successful send.") + flash(request, f"Retry did not succeed for {account.get('username', 'Account')} / {target_name}: {reason}", "error") return redirect("/ops/send-console") @app.post("/config") @@ -572,8 +551,8 @@ def create_app(): config["happyNewYear"] = happy_new_year save_config(config) - flash(request, "运行配置已保存。", "success") - return redirect("/settings") + flash(request, "Runtime config saved.", "success") + return redirect("/") @app.post("/settings") async def save_panel_settings(request: Request): @@ -605,12 +584,12 @@ def create_app(): confirm_password = str(form.get("confirm_password", "")) if new_password: if new_password != confirm_password: - flash(request, "管理员密码未更新:两次输入不一致。", "error") - return redirect("/settings") + flash(request, "Admin password was not updated because the confirmation did not match.", "error") + return redirect("/") update_admin_password(new_password) - flash(request, "面板与服务设置已保存。", "success") - return redirect("/settings") + flash(request, "Panel settings saved.", "success") + return redirect("/") @app.post("/ops/run-now") async def run_now(request: Request): @@ -623,14 +602,35 @@ def create_app(): return Response("Invalid CSRF token", status_code=403) pid = run_task_now() - if pid == -1: - flash(request, "补发全部对象启动失败,请查看服务日志。", "error") + if pid == TASK_ALREADY_RUNNING: + flash(request, "已有发送任务正在运行,本次补发全部对象没有启动。请等当前任务结束后再试。", "warning") + elif pid == -1: + flash(request, "Failed to start the full resend run. Check server logs for details.", "error") else: - flash(request, f"已启动补发全部对象任务(pid {pid})。", "success") + flash(request, f"已启动补发全部对象后台任务(pid {pid})。这只表示任务已启动,实际成功数请刷新发送控制台查看。", "info") + return redirect("/") + + @app.post("/ops/run-failed") + async def run_failed_retry(request: Request): + maybe_redirect = require_user(request) + if maybe_redirect: + return maybe_redirect + + form = await request.form() + if not validate_csrf(request, str(form.get("csrf_token", ""))): + return Response("Invalid CSRF token", status_code=403) + + pid = run_failed_retry_now() + if pid == TASK_ALREADY_RUNNING: + flash(request, "已有发送任务正在运行,本次补发未成功目标没有启动。请等当前任务结束后再试。", "warning") + elif pid == -1: + flash(request, "Failed to start the failed-target retry run. Check server logs for details.", "error") + else: + flash(request, f"已启动补发未成功目标后台任务(pid {pid})。这只表示任务已启动,实际成功数请刷新发送控制台查看。", "info") return redirect("/ops/send-console") @app.post("/ops/run-unsent") - async def run_unsent(request: Request): + async def run_unsent_retry(request: Request): maybe_redirect = require_user(request) if maybe_redirect: return maybe_redirect @@ -640,10 +640,12 @@ def create_app(): return Response("Invalid CSRF token", status_code=403) pid = run_unsent_retry_now() - if pid == -1: - flash(request, "补发未成功目标启动失败,请查看服务日志。", "error") + if pid == TASK_ALREADY_RUNNING: + flash(request, "A send task is already running; unsent retry was not started.", "warning") + elif pid == -1: + flash(request, "Failed to start the unsent-target retry run. Check server logs for details.", "error") else: - flash(request, f"已启动补发未成功目标任务(pid {pid})。", "success") + flash(request, f"Started unsent-target retry background task (pid {pid}). Refresh the send console for results.", "info") return redirect("/ops/send-console") @app.post("/ops/proxy/refresh") @@ -657,8 +659,8 @@ def create_app(): return Response("Invalid CSRF token", status_code=403) refresh_proxy() - flash(request, "代理订阅已刷新。", "success") - return redirect("/settings") + flash(request, "Proxy subscription refreshed.", "success") + return redirect("/") @app.post("/ops/proxy/restart") async def proxy_restart(request: Request): @@ -671,8 +673,8 @@ def create_app(): return Response("Invalid CSRF token", status_code=403) restart_proxy() - flash(request, "代理容器已重启。", "success") - return redirect("/settings") + flash(request, "Proxy container restarted.", "success") + return redirect("/") @app.post("/ops/schedule") async def save_schedule(request: Request): @@ -687,10 +689,10 @@ def create_app(): time_string = str(form.get("daily_schedule", "")).strip() result = update_daily_schedule(time_string) if getattr(result, "returncode", 1) == 0: - flash(request, f"发送窗口已更新为 {time_string}。", "success") + flash(request, f"Updated the daily schedule to {time_string}.", "success") else: - flash(request, f"发送窗口更新失败:{getattr(result, 'stderr', '')}", "error") - return redirect("/settings") + flash(request, f"Failed to update the daily schedule to {time_string}: {getattr(result, 'stderr', '')}", "error") + return redirect("/") @app.get("/ops/logs", response_class=HTMLResponse) async def logs_page(request: Request): diff --git a/DouYinSparkFlow/webui/ops.py b/DouYinSparkFlow/webui/ops.py index 18fc996..625ab83 100644 --- a/DouYinSparkFlow/webui/ops.py +++ b/DouYinSparkFlow/webui/ops.py @@ -6,6 +6,7 @@ import re import shlex import subprocess import sys +import unicodedata from datetime import datetime, timedelta, timezone from pathlib import Path from zoneinfo import ZoneInfo @@ -14,6 +15,8 @@ from utils.config import get_app_settings, get_config, get_userData, normalize_u logger = logging.getLogger(__name__) +TASK_ALREADY_RUNNING = -2 + TASK_SCHEDULE_MARKERS = ( "docker compose run --rm task", "docker compose run --rm douyin", @@ -61,6 +64,54 @@ def compose_command(*args): return base +def _pid_is_alive(pid): + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + return True + return True + + +def _parse_lock_pid(raw): + try: + return int(str(raw or "").strip().splitlines()[0]) + except (IndexError, TypeError, ValueError): + return None + + +def task_run_lock_status(): + lock_path = repo_root() / "logs" / "task.run.lock" + if not lock_path.exists(): + return {"running": False, "path": str(lock_path), "pid": None, "ageSeconds": 0, "staleRemoved": False} + + raw = lock_path.read_text(encoding="utf-8", errors="ignore") + pid = _parse_lock_pid(raw) + try: + age_seconds = max(0, int(datetime.now(timezone.utc).timestamp() - lock_path.stat().st_mtime)) + except OSError: + return {"running": False, "path": str(lock_path), "pid": pid, "ageSeconds": 0, "staleRemoved": False} + + if pid is not None and not _pid_is_alive(pid): + try: + lock_path.unlink() + logger.warning("Removed stale task run lock owned by missing pid=%s", pid) + return {"running": False, "path": str(lock_path), "pid": pid, "ageSeconds": age_seconds, "staleRemoved": True} + except FileNotFoundError: + return {"running": False, "path": str(lock_path), "pid": pid, "ageSeconds": age_seconds, "staleRemoved": True} + + if pid is None and age_seconds > 7200: + try: + lock_path.unlink() + logger.warning("Removed stale unreadable task run lock contents=%r", raw[:80]) + return {"running": False, "path": str(lock_path), "pid": None, "ageSeconds": age_seconds, "staleRemoved": True} + except FileNotFoundError: + return {"running": False, "path": str(lock_path), "pid": None, "ageSeconds": age_seconds, "staleRemoved": True} + + return {"running": True, "path": str(lock_path), "pid": pid, "ageSeconds": age_seconds, "staleRemoved": False} + + def build_task_run_spec(): if running_in_container(): return [sys.executable, "main.py", "--doTask"], repo_root() @@ -88,20 +139,22 @@ def _compose_env_args(extra_env=None): return " ".join(shlex.quote(part) for part in parts) -def _ops_log_file(): - return str(get_app_settings().get("ops_log_file") or "/app/logs/douyin-sparkflow.log") - - def build_scheduled_task_command(extra_env=None, trigger_label="scheduled send"): if running_in_container(): task_command = _with_env_prefix("python main.py --doTask", extra_env) - repo_root_quoted = shlex.quote(str(repo_root())) - script = ( - "timestamp=$(date -Iseconds); " + return ( + "/bin/bash -lc 'timestamp=$(date -Iseconds); " f"echo \"[AUTO_TRIGGER] $timestamp {trigger_label} start\"; " - f"cd {repo_root_quoted} && {task_command}" + "container=$(docker ps --format \"{{.Names}}\" | " + "grep -E \"^(douyin-web-hostfix|douyin-web)$\" | head -n 1); " + "if [ -z \"$container\" ]; then " + "echo \"[AUTO_TRIGGER] $timestamp no matching container found\"; " + "exit 1; " + "fi; " + "echo \"[AUTO_TRIGGER] $timestamp container=$container\"; " + "docker exec \"$container\" sh -lc " + f"\"cd /app && {task_command}\"'" ) - return f"/bin/bash -lc {shlex.quote(script)}" if compose_file_path(): compose_root_quoted = shlex.quote(str(compose_root())) compose_env_args = _compose_env_args(extra_env) @@ -236,15 +289,26 @@ def get_task_container_rows(): return [] -def run_task_now(*, unsent_only=False): +def run_task_now(*, unsent_only=False, failed_only=False): try: - log_file = Path(_ops_log_file()) + lock_status = task_run_lock_status() + if lock_status.get("running"): + logger.info( + "Refusing to start manual task because task lock is active pid=%s age=%ss", + lock_status.get("pid"), + lock_status.get("ageSeconds"), + ) + return TASK_ALREADY_RUNNING + + log_file = Path(get_app_settings().get("ops_log_file") or "/var/log/douyin-sparkflow.log") command, cwd = build_task_run_spec() run_env = { "SPARKFLOW_MANUAL_RUN": "1", "PYTHONUNBUFFERED": "1", } - if unsent_only: + if failed_only: + run_env["SPARKFLOW_MANUAL_FAILED_ONLY"] = "1" + elif unsent_only: run_env["SPARKFLOW_MANUAL_UNSENT_ONLY"] = "1" return run_background_command( command, @@ -259,6 +323,10 @@ def run_task_now(*, unsent_only=False): return -1 +def run_failed_retry_now(): + return run_task_now(failed_only=True) + + def run_unsent_retry_now(): return run_task_now(unsent_only=True) @@ -283,7 +351,7 @@ def restart_proxy(): def read_log_tail(lines=200): - log_path = Path(_ops_log_file()) + log_path = Path(get_app_settings().get("ops_log_file") or "/var/log/douyin-sparkflow.log") if not log_path.exists(): return "" content = log_path.read_text(encoding="utf-8", errors="replace").splitlines() @@ -348,7 +416,6 @@ def replace_douyin_cron_schedule(crontab_text, time_string): schedule = parse_schedule_string(time_string) scheduled_command = build_scheduled_task_command() fallback_command = build_unsent_fallback_task_command() - log_redirect = f" >> {shlex.quote(_ops_log_file())} 2>&1" updated = [] for raw_line in crontab_text.splitlines(): @@ -360,20 +427,20 @@ def replace_douyin_cron_schedule(crontab_text, time_string): if schedule["mode"] == "window": updated.append( f"*/{schedule['scheduleIntervalMinutes']} {schedule['startHour']}-{schedule['endHour'] - 1} * * * " - f"{scheduled_command}{log_redirect}" + f"{scheduled_command} >> /var/log/douyin-sparkflow.log 2>&1" ) updated.append( f"0 {schedule['endHour']} * * * " - f"{scheduled_command}{log_redirect}" + f"{scheduled_command} >> /var/log/douyin-sparkflow.log 2>&1" ) updated.append( f"{schedule['scheduleIntervalMinutes']} {schedule['endHour']} * * * " - f"{fallback_command}{log_redirect}" + f"{fallback_command} >> /var/log/douyin-sparkflow.log 2>&1" ) else: updated.append( f"{schedule['minute']} {schedule['hour']} * * * " - f"{scheduled_command}{log_redirect}" + f"{scheduled_command} >> /var/log/douyin-sparkflow.log 2>&1" ) normalized = "\n".join(line for line in updated if line.strip()) @@ -484,6 +551,70 @@ def _account_identity(user): return str(user.get("unique_id") or user.get("username") or "unknown").strip() +def _coerce_attempt_count(entry): + try: + return int(dict(entry or {}).get("attemptCount") or 0) + except (TypeError, ValueError): + return 0 + + +def _account_failure_pause_after_attempts(): + raw_value = str(os.getenv("SPARKFLOW_ACCOUNT_FAILURE_PAUSE_AFTER_ATTEMPTS") or "2").strip() + try: + return max(1, int(raw_value)) + except ValueError: + return 2 + + +def _account_failure_entry_today(account, now): + entry = dict(account.get("account_failure") or {}) + last_attempt_at = _parse_sent_at(entry.get("lastAttemptAt"), now.tzinfo) + if last_attempt_at and last_attempt_at.date() == now.date(): + entry["lastAttemptAt"] = last_attempt_at.isoformat(timespec="seconds") + first_attempt_at = _parse_sent_at(entry.get("firstAttemptAt"), now.tzinfo) + if first_attempt_at: + entry["firstAttemptAt"] = first_attempt_at.isoformat(timespec="seconds") + entry["attemptCount"] = _coerce_attempt_count(entry) + entry["affectedTargets"] = list(entry.get("affectedTargets") or []) + return entry + return {} + + +def _normalize_friend_index_key(value): + raw = unicodedata.normalize("NFKC", str(value or "")) + for token in ("\u200b", "\u200c", "\u200d", "\ufeff"): + raw = raw.replace(token, "") + raw = raw.replace("\xa0", " ") + return " ".join(raw.split()).strip() + + +def _friend_index_status(account, target_name): + friend_index = dict(account.get("friend_index") or {}) + entry = dict(friend_index.get(_normalize_friend_index_key(target_name)) or {}) + return { + "seen": bool(entry), + "visibleName": str(entry.get("visibleName") or ""), + "stableKeys": list(entry.get("stableKeys") or []), + "lastSeenAt": str(entry.get("lastSeenAt") or ""), + } + + +def _account_blocked_target_status(item, account_failure): + blocked_item = dict(item) + affected_targets = set(account_failure.get("affectedTargets") or []) + blocked_item.update( + { + "status": "account_blocked", + "category": str(account_failure.get("category") or ""), + "reason": str(account_failure.get("reason") or ""), + "attemptCount": _coerce_attempt_count(account_failure), + "lastAttemptAt": str(account_failure.get("lastAttemptAt") or ""), + "accountFailureAffected": blocked_item.get("target") in affected_targets, + } + ) + return blocked_item + + def _scheduled_send_time(user, target_name, send_window, now): window_minutes = max(1, (send_window["endHour"] - send_window["startHour"]) * 60) start_of_window = now.replace( @@ -501,6 +632,7 @@ def _scheduled_send_time(user, target_name, send_window, now): def _build_target_status(account, target_name, now, send_window): history = dict(account.get("message_history") or {}) failure_queue = dict(account.get("failure_queue") or {}) + friend_index = _friend_index_status(account, target_name) history_entry = history.get(target_name) or {} sent_at = _parse_sent_at(history_entry.get("sentAt"), now.tzinfo) @@ -515,6 +647,7 @@ def _build_target_status(account, target_name, now, send_window): "reason": "", "attemptCount": 0, "scheduledAt": "", + "friendIndex": friend_index, } failure_entry = failure_queue.get(target_name) or {} @@ -530,6 +663,7 @@ def _build_target_status(account, target_name, now, send_window): "reason": str(failure_entry.get("reason") or ""), "attemptCount": int(failure_entry.get("attemptCount") or 0), "scheduledAt": "", + "friendIndex": friend_index, } scheduled_at = None @@ -546,6 +680,7 @@ def _build_target_status(account, target_name, now, send_window): "reason": "", "attemptCount": 0, "scheduledAt": scheduled_at.isoformat(timespec="seconds"), + "friendIndex": friend_index, } return { @@ -558,6 +693,7 @@ def _build_target_status(account, target_name, now, send_window): "reason": "", "attemptCount": 0, "scheduledAt": scheduled_at.isoformat(timespec="seconds") if scheduled_at else "", + "friendIndex": friend_index, } @@ -568,35 +704,83 @@ def get_send_console_snapshot(): summary = { "enabled_accounts": len(accounts), + "total_targets": 0, "today_sent_targets": 0, "today_failed_targets": 0, "today_pending_targets": 0, "today_unprocessed_targets": 0, + "today_account_blocked_targets": 0, + "today_remaining_targets": 0, + "today_account_failures": 0, + "today_account_paused": 0, } account_rows = [] + account_failure_pause_after = _account_failure_pause_after_attempts() for account in accounts: - statuses = [_build_target_status(account, target_name, now, send_window) for target_name in account.get("targets") or []] + configured_targets = list(account.get("targets") or []) + statuses = [_build_target_status(account, target_name, now, send_window) for target_name in configured_targets] sent_targets = [item for item in statuses if item["status"] == "sent"] failed_targets = [item for item in statuses if item["status"] == "failed"] - pending_targets = [item for item in statuses if item["status"] == "pending"] - unprocessed_targets = [item for item in statuses if item["status"] == "unprocessed"] + account_failure = _account_failure_entry_today(account, now) + account_paused = bool(account_failure and _coerce_attempt_count(account_failure) >= account_failure_pause_after) + account_blocked_targets = [] + if account_paused: + account_blocked_targets = [ + _account_blocked_target_status(item, account_failure) + for item in statuses + if item["status"] in {"pending", "unprocessed"} + ] + pending_targets = [] + unprocessed_targets = [] + else: + pending_targets = [item for item in statuses if item["status"] == "pending"] + unprocessed_targets = [item for item in statuses if item["status"] == "unprocessed"] + friend_index_meta = dict(account.get("friend_index_meta") or {}) + friend_index_last_scan_at = _parse_sent_at(friend_index_meta.get("lastScanAt"), now.tzinfo) + if friend_index_last_scan_at: + friend_index_meta["lastScanAt"] = friend_index_last_scan_at.isoformat(timespec="seconds") + friend_index_meta["missingTargets"] = list(friend_index_meta.get("missingTargets") or []) + friend_index_meta["lastScanComplete"] = bool(friend_index_meta.get("lastScanComplete")) + try: + friend_index_meta["scannedCount"] = int(friend_index_meta.get("scannedCount") or 0) + except (TypeError, ValueError): + friend_index_meta["scannedCount"] = 0 + summary["total_targets"] += len(configured_targets) summary["today_sent_targets"] += len(sent_targets) summary["today_failed_targets"] += len(failed_targets) summary["today_pending_targets"] += len(pending_targets) summary["today_unprocessed_targets"] += len(unprocessed_targets) + summary["today_account_blocked_targets"] += len(account_blocked_targets) + summary["today_remaining_targets"] += ( + len(failed_targets) + + len(pending_targets) + + len(unprocessed_targets) + + len(account_blocked_targets) + ) + if account_failure: + summary["today_account_failures"] += 1 + if account_paused: + summary["today_account_paused"] += 1 account_rows.append( { "unique_id": str(account.get("unique_id") or ""), "username": account.get("username") or "", + "total_targets": len(configured_targets), "sent_targets": sent_targets, "failed_targets": failed_targets, "pending_targets": pending_targets, "unprocessed_targets": unprocessed_targets, + "account_blocked_targets": account_blocked_targets, "last_failure_reason": failed_targets[0]["reason"] if failed_targets else "", "failure_queue": dict(account.get("failure_queue") or {}), + "account_failure": account_failure, + "account_paused": account_paused, + "account_failure_pause_after": account_failure_pause_after, + "friend_index_meta": friend_index_meta, + "friend_index_count": len(dict(account.get("friend_index") or {})), } ) @@ -633,6 +817,7 @@ def get_ops_snapshot(): "containers": get_container_status(), "task_containers": get_task_container_rows(), "send_console": get_send_console_snapshot(), + "task_lock": task_run_lock_status(), "daily_schedule": current_daily_schedule(), "crontab": read_crontab(), "log_tail": read_log_tail(120), diff --git a/DouYinSparkFlow/webui/templates/base.html b/DouYinSparkFlow/webui/templates/base.html index e2b13b2..caeb04c 100644 --- a/DouYinSparkFlow/webui/templates/base.html +++ b/DouYinSparkFlow/webui/templates/base.html @@ -1,148 +1,521 @@ {% set current_nav %}{% block nav_key %}dashboard{% endblock %}{% endset %} - + - - - {% block title %}自动续火花{% endblock %} - + + + {% block title %}续火花{% endblock %} +
-
- -
- - 自动续火花 -
- -
- - - -