Add unsent fallback retry workflow

This commit is contained in:
Rixuan Shao
2026-05-29 18:00:47 +08:00
parent 849731442a
commit 11a6c949db
6 changed files with 253 additions and 19 deletions

View File

@@ -3,6 +3,9 @@
"taskCount": 1,
"proxyAddress": "",
"messageTemplate": "🤩今日火花+1\r\n",
"saveDebugArtifacts": false,
"useProtocolSender": false,
"protocolDryRun": false,
"browserSenderAccounts": [
"94262577168",
"抖音号softwomen"
@@ -36,4 +39,4 @@
"enabled": true,
"messageTemplate": "\r\n"
}
}
}

View File

@@ -48,6 +48,10 @@ def _normalize_target_name(value):
def _current_run_mode():
if _manual_run_unsent_only():
return "manual_unsent_only"
if _manual_run_failed_only():
return "manual_failed_only"
return "manual" if _is_manual_run() else "scheduled"
@@ -196,8 +200,26 @@ async def scroll_and_select_user(page, account_name, targets):
}
found_usernames = set()
remaining_targets = set(normalized_targets)
scan_started_at = asyncio.get_running_loop().time()
last_new_friend_at = scan_started_at
max_scan_seconds = 300
idle_scan_seconds = 120
def missing_target_names():
return sorted(normalized_targets[item] for item in remaining_targets)
while True:
now_monotonic = asyncio.get_running_loop().time()
if now_monotonic - scan_started_at > max_scan_seconds:
logger.warning(
"Account %s friend list scan timed out after %ss. Missing targets: %s; scannedFriends=%s",
account_name,
max_scan_seconds,
missing_target_names(),
len(found_usernames),
)
return
target_elements = await page.locator(target_selector).all()
for element in target_elements:
@@ -214,6 +236,7 @@ async def scroll_and_select_user(page, account_name, targets):
if normalized_target_name in found_usernames:
continue
found_usernames.add(normalized_target_name)
last_new_friend_at = asyncio.get_running_loop().time()
logger.debug("Account %s found friend entry %s", account_name, target_name)
matched_target_name = normalized_targets.get(normalized_target_name)
@@ -239,7 +262,18 @@ async def scroll_and_select_user(page, account_name, targets):
logger.warning(
"Account %s reached the end of the friend list. Missing targets: %s",
account_name,
sorted(normalized_targets[item] for item in remaining_targets),
missing_target_names(),
)
return
now_monotonic = asyncio.get_running_loop().time()
if found_usernames and now_monotonic - last_new_friend_at > idle_scan_seconds:
logger.warning(
"Account %s friend list scan made no progress for %ss. Missing targets: %s; scannedFriends=%s",
account_name,
idle_scan_seconds,
missing_target_names(),
len(found_usernames),
)
return
@@ -317,6 +351,19 @@ def _manual_run_failed_only():
return _is_manual_run() and os.getenv("SPARKFLOW_MANUAL_FAILED_ONLY") == "1"
def _manual_run_unsent_only():
return _is_manual_run() and os.getenv("SPARKFLOW_MANUAL_UNSENT_ONLY") == "1"
def _unsent_retry_max_attempts():
raw_value = str(os.getenv("SPARKFLOW_UNSENT_RETRY_MAX_ATTEMPTS") or "3").strip()
try:
return max(1, int(raw_value))
except ValueError:
logger.warning("Invalid SPARKFLOW_UNSENT_RETRY_MAX_ATTEMPTS=%r, using 3", raw_value)
return 3
def _target_sent_today(user, target_name, now):
history = dict(user.get("message_history") or {})
entry = history.get(target_name) or {}
@@ -331,6 +378,18 @@ def _target_failed_today(user, target_name, now):
return bool(last_attempt_at and last_attempt_at.date() == now.date())
def _target_failure_attempts_today(user, target_name, now):
queue = dict(user.get("failure_queue") or {})
entry = queue.get(target_name) or {}
last_attempt_at = _parse_sent_at(entry.get("lastAttemptAt"), now.tzinfo)
if not last_attempt_at or last_attempt_at.date() != now.date():
return 0
try:
return int(entry.get("attemptCount") or 0)
except (TypeError, ValueError):
return 0
def _pending_failed_targets(user, now):
queue = dict(user.get("failure_queue") or {})
targets = []
@@ -342,6 +401,21 @@ def _pending_failed_targets(user, now):
return targets
def _pending_unsent_targets(user, now):
retry_targets = []
skipped_targets = []
max_attempts = _unsent_retry_max_attempts()
for target_name in user.get("targets") or []:
if _target_sent_today(user, target_name, now):
continue
attempts_today = _target_failure_attempts_today(user, target_name, now)
if attempts_today >= max_attempts:
skipped_targets.append(f"{target_name}({attempts_today})")
continue
retry_targets.append(target_name)
return retry_targets, skipped_targets
def _scheduled_send_time(user, target_name, send_window, now):
window_minutes = (send_window["endHour"] - send_window["startHour"]) * 60
start_of_window = now.replace(
@@ -373,7 +447,8 @@ def _select_due_targets(user, send_window, now):
second=0,
microsecond=0,
)
if now < window_start or now > window_end:
window_grace_end = window_end + timedelta(minutes=send_window["scheduleIntervalMinutes"])
if now < window_start or now > window_grace_end:
already_sent = []
pending_targets = []
queued_failures = []
@@ -430,6 +505,29 @@ def _prepare_active_users_for_run(active_config, active_user_data):
logger.info("No queued failures are pending for manual retry")
return runnable_users
if _manual_run_unsent_only():
logger.info(
"SPARKFLOW_MANUAL_RUN=1 and SPARKFLOW_MANUAL_UNSENT_ONLY=1, retrying today's unsent targets only"
)
runnable_users = []
for user in active_user_data:
retry_targets, skipped_targets = _pending_unsent_targets(user, now)
already_sent = [target for target in user.get("targets") or [] if _target_sent_today(user, target, now)]
logger.info(
"manual-unsent user=%s retryTargets=%s alreadySentToday=%s skippedMaxAttempts=%s",
user.get("username", "unknown"),
retry_targets,
already_sent,
skipped_targets,
)
if retry_targets:
runnable_user = dict(user)
runnable_user["targets"] = retry_targets
runnable_users.append(runnable_user)
if not runnable_users:
logger.info("No unsent targets are pending for manual retry")
return runnable_users
if _is_manual_run():
logger.info("SPARKFLOW_MANUAL_RUN=1, bypassing daily send window")
return [dict(user, targets=list(user.get("targets") or [])) for user in active_user_data]

View File

@@ -39,7 +39,15 @@ from webui.auth import (
validate_csrf,
verify_password,
)
from webui.ops import get_ops_snapshot, read_log_tail, refresh_proxy, restart_proxy, run_task_now, update_daily_schedule
from webui.ops import (
get_ops_snapshot,
read_log_tail,
refresh_proxy,
restart_proxy,
run_task_now,
run_unsent_retry_now,
update_daily_schedule,
)
BASE_DIR = Path(__file__).resolve().parent
@@ -580,6 +588,23 @@ def create_app():
flash(request, f"Triggered a full resend run in the background (pid {pid}).", "success")
return redirect("/")
@app.post("/ops/run-unsent")
async def run_unsent(request: Request):
maybe_redirect = require_user(request)
if maybe_redirect:
return maybe_redirect
form = await request.form()
if not validate_csrf(request, str(form.get("csrf_token", ""))):
return Response("Invalid CSRF token", status_code=403)
pid = run_unsent_retry_now()
if pid == -1:
flash(request, "Failed to start the unsent-target fallback run. Check server logs for details.", "error")
else:
flash(request, f"Triggered an unsent-target fallback run in the background (pid {pid}).", "success")
return redirect("/ops/send-console")
@app.post("/ops/proxy/refresh")
async def proxy_refresh(request: Request):
maybe_redirect = require_user(request)

View File

@@ -69,11 +69,31 @@ def build_task_run_spec():
return [sys.executable, "main.py", "--doTask"], repo_root()
def build_scheduled_task_command():
def _env_shell_prefix(extra_env=None):
parts = []
for key, value in (extra_env or {}).items():
parts.append(f"{key}={shlex.quote(str(value))}")
return " ".join(parts)
def _with_env_prefix(command, extra_env=None):
env_prefix = _env_shell_prefix(extra_env)
return f"env {env_prefix} {command}" if env_prefix else command
def _compose_env_args(extra_env=None):
parts = []
for key, value in (extra_env or {}).items():
parts.extend(["-e", f"{key}={value}"])
return " ".join(shlex.quote(part) for part in parts)
def build_scheduled_task_command(extra_env=None, trigger_label="scheduled send"):
if running_in_container():
task_command = _with_env_prefix("python main.py --doTask", extra_env)
return (
"/bin/bash -lc 'timestamp=$(date -Iseconds); "
"echo \"[AUTO_TRIGGER] $timestamp scheduled send start\"; "
f"echo \"[AUTO_TRIGGER] $timestamp {trigger_label} start\"; "
"container=$(docker ps --format \"{{.Names}}\" | "
"grep -E \"^(douyin-web-hostfix|douyin-web)$\" | head -n 1); "
"if [ -z \"$container\" ]; then "
@@ -82,21 +102,35 @@ def build_scheduled_task_command():
"fi; "
"echo \"[AUTO_TRIGGER] $timestamp container=$container\"; "
"docker exec \"$container\" sh -lc "
"\"cd /app && python main.py --doTask\"'"
f"\"cd /app && {task_command}\"'"
)
if compose_file_path():
compose_root_quoted = shlex.quote(str(compose_root()))
compose_env_args = _compose_env_args(extra_env)
compose_env_suffix = f" {compose_env_args}" if compose_env_args else ""
return (
"/bin/bash -lc "
f"'echo \"[AUTO_TRIGGER] $(date -Iseconds) compose task start\"; "
f"cd {compose_root_quoted} && /usr/bin/docker compose run --rm task'"
f"'echo \"[AUTO_TRIGGER] $(date -Iseconds) compose {trigger_label} start\"; "
f"cd {compose_root_quoted} && /usr/bin/docker compose run --rm{compose_env_suffix} task'"
)
repo_root_quoted = shlex.quote(str(repo_root()))
python_quoted = shlex.quote(sys.executable)
task_command = _with_env_prefix(f"{python_quoted} main.py --doTask", extra_env)
return (
"/bin/bash -lc "
f"'echo \"[AUTO_TRIGGER] $(date -Iseconds) local task start\"; "
f"cd {repo_root_quoted} && {python_quoted} main.py --doTask'"
f"'echo \"[AUTO_TRIGGER] $(date -Iseconds) local {trigger_label} start\"; "
f"cd {repo_root_quoted} && {task_command}'"
)
def build_unsent_fallback_task_command():
return build_scheduled_task_command(
{
"SPARKFLOW_MANUAL_RUN": "1",
"SPARKFLOW_MANUAL_UNSENT_ONLY": "1",
"PYTHONUNBUFFERED": "1",
},
trigger_label="unsent fallback",
)
@@ -204,18 +238,21 @@ def get_task_container_rows():
return []
def run_task_now():
def run_task_now(*, unsent_only=False):
try:
log_file = Path(get_app_settings().get("ops_log_file") or "/var/log/douyin-sparkflow.log")
command, cwd = build_task_run_spec()
run_env = {
"SPARKFLOW_MANUAL_RUN": "1",
"PYTHONUNBUFFERED": "1",
}
if unsent_only:
run_env["SPARKFLOW_MANUAL_UNSENT_ONLY"] = "1"
return run_background_command(
command,
log_file,
cwd=cwd,
env={
"SPARKFLOW_MANUAL_RUN": "1",
"PYTHONUNBUFFERED": "1",
},
env=run_env,
)
except Exception as exc:
import traceback
@@ -224,6 +261,10 @@ def run_task_now():
return -1
def run_unsent_retry_now():
return run_task_now(unsent_only=True)
def refresh_proxy():
try:
script = Path(get_app_settings().get("proxy_refresh_script") or "")
@@ -308,6 +349,7 @@ def validate_time_string(time_string):
def replace_douyin_cron_schedule(crontab_text, time_string):
schedule = parse_schedule_string(time_string)
scheduled_command = build_scheduled_task_command()
fallback_command = build_unsent_fallback_task_command()
updated = []
for raw_line in crontab_text.splitlines():
@@ -325,6 +367,10 @@ def replace_douyin_cron_schedule(crontab_text, time_string):
f"0 {schedule['endHour']} * * * "
f"{scheduled_command} >> /var/log/douyin-sparkflow.log 2>&1"
)
updated.append(
f"{schedule['scheduleIntervalMinutes']} {schedule['endHour']} * * * "
f"{fallback_command} >> /var/log/douyin-sparkflow.log 2>&1"
)
else:
updated.append(
f"{schedule['minute']} {schedule['hour']} * * * "

View File

@@ -7,6 +7,10 @@
{% block topbar_actions %}
<a class="ghost-button" href="/ops/send-console">✦ 发送控制台</a>
<form method="post" action="/ops/run-unsent">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<button class="soft-button" type="submit">补发未成功目标</button>
</form>
<form method="post" action="/ops/run-now">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<button type="submit">✈ 补发全部对象</button>
@@ -404,6 +408,17 @@
padding: 9px 10px;
}
.friend-option input[type="checkbox"] {
position: static;
inset: auto;
width: 16px;
height: 16px;
opacity: 1;
pointer-events: auto;
accent-color: var(--primary);
flex: 0 0 auto;
}
.friend-option.selected {
background: var(--primary-soft);
border-color: rgba(47, 103, 255, 0.12);
@@ -713,10 +728,10 @@
<span>启用自动续火花</span>
</label>
<label>
<span>手动目标好友(每行一个)</span>
<textarea name="targets" rows="5">{{ account.targets|default([], true)|join('\n') }}</textarea>
<span>目标好友(每行一个,可手动编辑或从下方勾选</span>
<textarea class="targets-textarea" name="targets" rows="5">{{ account.targets|default([], true)|join('\n') }}</textarea>
</label>
<p class="muted compact">如果好友过多无法滚动读取,可直接在这里填写目标昵称并保存</p>
<p class="muted compact">未在好友缓存中的昵称也会保留在目标列表里</p>
<div class="friend-picker"
data-account-id="{{ account.unique_id }}"
@@ -866,6 +881,10 @@
</div>
</div>
<div class="button-grid">
<form method="post" action="/ops/run-unsent">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<button class="soft-button" type="submit">补发未成功目标</button>
</form>
<form method="post" action="/ops/run-now">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<button type="submit">补发全部对象</button>
@@ -1164,6 +1183,9 @@
const summaryEl = picker.querySelector(".friend-picker-summary");
const statusEl = picker.querySelector(".friend-picker-status");
const hiddenInputsEl = picker.querySelector(".friend-selected-inputs");
const formEl = picker.closest("form");
const targetsTextarea = formEl?.querySelector(".targets-textarea");
const currentTargetsEl = picker.querySelector(".friend-picker-current-targets span");
let friends = parseJsonScript(`friends-cache-${accountId}`);
let selected = new Set(parseJsonScript(`selected-targets-${accountId}`));
@@ -1179,6 +1201,31 @@
return merged;
};
const splitTargetText = (value) => {
const seen = new Set();
return String(value || "")
.replaceAll(",", "\n")
.split(/\r?\n/)
.map((name) => name.trim())
.filter((name) => {
if (!name || seen.has(name)) return false;
seen.add(name);
return true;
});
};
const syncTextareaFromSelected = () => {
if (targetsTextarea) {
targetsTextarea.value = [...selected].join("\n");
}
};
const syncSelectedFromTextarea = () => {
if (targetsTextarea) {
selected = new Set(splitTargetText(targetsTextarea.value));
}
};
const renderHiddenInputs = () => {
hiddenInputsEl.innerHTML = "";
[...selected].forEach((name) => {
@@ -1192,6 +1239,9 @@
const updateSummary = () => {
summaryEl.textContent = `已选 ${selected.size}`;
if (currentTargetsEl) {
currentTargetsEl.textContent = selected.size ? [...selected].join("、") : "未选择";
}
};
const renderList = () => {
@@ -1229,6 +1279,7 @@
selected.delete(value);
option?.classList.remove("selected");
}
syncTextareaFromSelected();
renderHiddenInputs();
updateSummary();
});
@@ -1264,6 +1315,13 @@
});
searchInput.addEventListener("input", renderList);
if (targetsTextarea) {
targetsTextarea.addEventListener("input", () => {
syncSelectedFromTextarea();
renderList();
});
}
syncSelectedFromTextarea();
renderList();
});
})();

View File

@@ -7,6 +7,10 @@
{% block topbar_actions %}
<a class="ghost-button" href="/">⌂ 返回首页</a>
<form method="post" action="/ops/run-unsent">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<button class="soft-button" type="submit">补发未成功目标</button>
</form>
<form method="post" action="/ops/run-now">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<button type="submit">✈ 补发全部对象</button>