Sync remote sparkflow changes and add send console

This commit is contained in:
Rixuan Shao
2026-05-17 22:48:30 +08:00
parent de4f3aeb74
commit 9e3cc85215
7 changed files with 967 additions and 83 deletions

View File

@@ -1,6 +1,6 @@
MIT License
Copyright (c) 2026 2061360308
Copyright (c) 2026 2061360308 盧瞳
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@@ -1,18 +1,137 @@
# DouYinSparkFlow
# DouYin Spark Flow
这里是核心应用源码目录,包含:
![Python](https://img.shields.io/badge/Python-3.8%2B-blue?logo=python)
![Playwright](https://img.shields.io/badge/Playwright-%E2%9C%94-green?logo=playwright)
![chrome-headless-shell](https://img.shields.io/badge/chrome--headless--shell-%E2%9C%94-brightgreen?logo=googlechrome)
- `core/`: 任务执行、协议发送、浏览器自动化等核心逻辑
- `webui/`: Web 管理界面与相关后端处理
- `utils/`: 配置、日志和通用辅助逻辑
- `scripts/`: 运行和登录辅助脚本
## 🎉 2026 正月限定(除夕至正月十五)
## 本地开发入口
除夕当天到正月十五期间,可开启祝福模式,每日向好友发送与当日相关的祝福语。
- 安装依赖:`requirements.txt` / `requirements-web.txt`
- 应用入口:`main.py`
- 容器构建参考:`Dockerfile.server`
### 启用方法
运行时账号数据、Web 管理设置、浏览器缓存和日志文件不随仓库提供,需要在目标环境中自行生成
拉取最新代码后,将 `config.json``happyNewYear` 下的 `enabled` 设为 `true`
仓库级说明、部署结构和敏感文件约定请查看上级目录的 [README.md](../README.md)。
`happyNewYear.messageTemplate` 为正月祝福模板,支持以下占位符:
1. `[API]`:祝福语
2. `[data]`:公历日期,例如 2026年02月16日
3. `[data_lunar]`:农历日期,例如 农历除夕、正月初一、正月初二 等
## 交流讨论
已开放讨论区,有疑问或展示相关成果,发布话题需求的可以加入讨论
[跳转讨论区](https://github.com/2061360308/DouYinSparkFlow/discussions)
## 📌 简单介绍
**抖音火花自动续火脚本**一款轻量实用的抖音互动脚本,可自动为你和抖音好友续火花,无需手动操作。
✅ 支持 GitHub Actions 自动运行(开箱即用的 Workflow 配置)
✅ 也可部署至自有服务器,灵活适配个人使用场景
### 特性
- [x] 多用户,同时批量支持多个账户
- [x] 多目标,一个账户支持多个续火花目标
- [x] 一言支持,更丰富的消息文本
使用`PlayWright`以及`chrome-headless-shell`自动化操作[抖音创作者中心](https://creator.douyin.com/),进行定时发送抖音消息来续火花
## 🚀 使用方法
### 1. 克隆项目到本地,并完成环境配置
```shell
pip install -r requirements.txt
cp usersData.example.json usersData.json
```
### 2. 运行main.py
首次运行`python main.py`会自动下载需要的测试浏览器默认从Mozilla的镜像站下载需要保证网络通畅。
![main.py运行截图](docs/images/屏幕截图%202026-02-14%20223607.png)
### 3. 登录用户
运行main.py后会弹出可选择的项目这时选择添加用户登录你可以选择添加多个用户。具体操作方式根据提示在弹出窗口扫码登录抖音创作者中心即可登录成功后你需要根据提示查看对应联系人的名称并在控制台输入。
### 4. 更改配置
你可以选择更改config.json中的配置目前proxyAddress的代理设置还没有实现。其他项目解释如下
|名称|作用解释|期望值|
|-----|-----|-----|
|multiTask|是否启用多任务,登录多个账户后生效,启用后同时操作多个账户的任务加快执行速度|`true` `false`|
|taskCount|最大同时操作的账户数目需要先启用multiTask|int默认`5`|
|messageTemplate|发送消息的模板,可以从抖音聊天框编辑好后直接复制过来,这样可以拿到简单表情的代码,例如`[盖瑞]`|使用`[API]`引用每日一言内容 默认值为: `[盖瑞]今日火花[加一]\n—— [右边] 每日一言 [左边] ——\n[API]`|
|hitokotoTypes|每日一言消息允许的类型|可以留空使用所有类型`[]`,全部可选类型的列表为:`["动画","漫画","游戏","文学","原创","来自网络","影视","诗词","哲学","抖机灵","其他"]`|
### 5. 测试运行
再次运行main.py之后选择`3.本地运行任务`,查看是否能够正常执行任务
### 6. Github Acion部署
项目可以部署到Github Action每日定时触发在测试完毕后你需要将本地代码推送到自己的Github仓库你也可以选择直接克隆本仓库后续将config.json同步即可如果你更改了设置的话。本地通过usersData.json存储已经登录的账户凭证为了防止信息泄露Action不能像本地那样从明文读取这个配置也不要将这个文件上传到Github正确做法是将内容存放到`secrets`
> 方法: 在你的Github仓库下操作选择settings->Environments在下面新建一个`user-data`环境,继续在这个`user-data`环境的Environment secrets添加名为`USER_DATA`的项目
![创建`user-data`环境图](docs/images/屏幕截图%202026-02-14%20224915.png)
关于这个配置的内容可以再次运行main.py,选择`2. 获取Github Action配置`将对应输出内容填入`USER_DATA`的值即可
![填写配置内容图](docs/images/屏幕截图%202026-02-14%20224951.png)
### 7. 可选手动触发Action进行测试
仓库的工作流中添加了`workflow_dispatch`以便允许进行手动触发在初次配置完成后可以通过手动触发Action来进行验证。
![手动测试](docs/images/屏幕截图%202026-02-14%20224614.png)
## 💬 问题解答
1. 首次**克隆仓库**后启用Action
**解答:**
克隆后Github Action 默认在新仓库中是关闭的。你需要在克隆仓库后,手动进入你的 Github 仓库页面,依次点击 `Actions` 选项卡首次进入会看到“启用工作流”或“Enable workflows”按钮点击即可激活仓库中的 Action 工作流。
启用后,工作流会根据 `.github/workflows` 目录下的配置自动运行。你可以通过手动触发workflow_dispatch或等待定时任务自动执行。
> 注意:首次启用后建议手动运行一次,确保配置无误。
2. 运行一段时间后Github提示仓库太久没有新活动定时Action被禁用
> 通常提示Scheduled workflows disabled
To reduce unnecessary workflow runs, scheduled workflows have been disabled in this repository because it has been more than 60 days since the last commit.
![Scheduled workflows disabled
To reduce unnecessary workflow runs, scheduled workflows have been disabled in this repository because it has been more than 60 days since the last commit.](docs/images/image.png)
**解答:**
这是 Github 的自动保护机制:如果仓库 60 天内没有任何提交或活动所有定时schedule类型的 Action 会被自动禁用,防止资源浪费。
遇到这种情况,只需在仓库内进行一次代码提交(如修改 README 或随便提交一个空白更改),然后重新进入 Actions 页面点击提示条上的“Enable workflow”或“启用工作流”按钮即可恢复定时任务。
恢复后,定时 Action 会重新按照 workflow 文件的 schedule 设定自动运行。
建议:如果你长期需要定时任务,定期(如每月)做一次无关紧要的提交,防止被自动禁用。
> 补充,我在仓库中尝试引入了`liskin/gh-workflow-keepalive`理论上在此之后复刻仓库的或者进行同步后的仓库不需要再手动保活具体详见action的`workflow-keepalive` Job
## ⚠️ 免责声明
1. 本项目为**开源学习用途**,仅用于技术研究和个人自用,严禁用于商业用途、恶意刷量或违反抖音平台规则的行为。
2. 使用本脚本产生的一切风险(包括但不限于抖音账号限流、封禁、处罚等)均由使用者自行承担,项目开发者不承担任何责任。
3. 本项目仅调用公开的接口/模拟人工操作,不涉及破解、入侵抖音系统,使用者需遵守《抖音用户服务协议》及相关法律法规。
4. 请合理控制脚本运行频率,避免给抖音平台服务器造成压力,建议仅用于个人少量好友的火花维系。
5. 若你使用本项目即表示已阅读并同意本免责声明,如不同意请立即停止使用。
## 📄 开源协议
本项目基于 MIT 协议开源,你可以自由使用、修改和分发本项目代码,详见 [LICENSE](LICENSE) 文件。

View File

@@ -2,6 +2,7 @@ import asyncio
import hashlib
import logging
import os
import unicodedata
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from pathlib import Path
@@ -38,6 +39,18 @@ def _safe_name(value):
return "".join(ch if ch.isalnum() or ch in ("-", "_") else "_" for ch in value)[:80]
def _normalize_target_name(value):
raw = unicodedata.normalize("NFKC", str(value or ""))
for token in ("\u200b", "\u200c", "\u200d", "\ufeff"):
raw = raw.replace(token, "")
raw = raw.replace("\xa0", " ")
return " ".join(raw.split()).strip()
def _current_run_mode():
return "manual" if _is_manual_run() else "scheduled"
async def save_debug_artifacts(page, account_name, target_name, stage):
if not get_config(force_reload=True).get("saveDebugArtifacts", False):
return
@@ -103,6 +116,56 @@ async def confirm_message_sent(page, chat_input, message):
return False, f"chat input still contains: {input_text!r}"
async def detect_message_already_sent(page, chat_input, message):
try:
if chat_input is not None:
sent_ok, detail = await confirm_message_sent(page, chat_input, message)
if sent_ok:
return True, detail
except Exception:
pass
first_line = message.split("\n")[0].strip()
if not first_line:
return False, ""
try:
bubble = page.locator(f"text={first_line}").last
if await bubble.count() > 0:
return True, "message bubble located after failure"
except Exception:
pass
return False, ""
def classify_browser_failure(stage, exc):
detail = str(exc or "")
lowered = detail.lower()
if "page crashed" in lowered or "target page, context or browser has been closed" in lowered:
return "page_crashed"
if "timeout" in lowered:
if stage in {"open_creator_home", "open_chat_page"}:
return "navigation_timeout"
if stage == "locate_chat_input":
return "chat_input_timeout"
if stage == "friend_list":
return "friend_list_timeout"
return "timeout"
if "unable to locate chat input" in lowered:
return "chat_input_not_found"
if "could not find the friend list scroll container" in lowered:
return "friend_list_container_missing"
if "chat input still contains" in lowered:
return "send_unconfirmed"
if "missing targets" in lowered:
return "friend_not_found"
if stage in {"open_creator_home", "open_chat_page"}:
return "navigation_failed"
return "unknown"
async def scroll_and_select_user(page, account_name, targets):
friends_tab_selector = 'xpath=//*[@id="sub-app"]/div/div/div[1]/div[2]'
target_selector = (
@@ -126,8 +189,13 @@ async def scroll_and_select_user(page, account_name, targets):
await page.locator(first_friend_selector).click()
await asyncio.sleep(2)
normalized_targets = {
_normalize_target_name(target): str(target)
for target in targets
if _normalize_target_name(target)
}
found_usernames = set()
remaining_targets = set(targets)
remaining_targets = set(normalized_targets)
while True:
target_elements = await page.locator(target_selector).all()
@@ -139,24 +207,40 @@ async def scroll_and_select_user(page, account_name, targets):
except Exception:
continue
if target_name in found_usernames:
normalized_target_name = _normalize_target_name(target_name)
if not normalized_target_name:
continue
found_usernames.add(target_name)
if normalized_target_name in found_usernames:
continue
found_usernames.add(normalized_target_name)
logger.debug("Account %s found friend entry %s", account_name, target_name)
if target_name in targets:
matched_target_name = normalized_targets.get(normalized_target_name)
if matched_target_name:
await element.click()
logger.info("Account %s selected target friend %s", account_name, target_name)
yield target_name
if matched_target_name != target_name:
logger.info(
"Account %s normalized target %r matched visible friend %r",
account_name,
matched_target_name,
target_name,
)
yield matched_target_name
remaining_targets.discard(target_name)
remaining_targets.discard(normalized_target_name)
if not remaining_targets:
logger.info("Account %s found all target friends", account_name)
return
break
else:
if await page.locator(no_more_selector).count() > 0:
logger.warning("Account %s reached the end of the friend list. Missing targets: %s", account_name, sorted(remaining_targets))
logger.warning(
"Account %s reached the end of the friend list. Missing targets: %s",
account_name,
sorted(normalized_targets[item] for item in remaining_targets),
)
return
if await page.locator(loading_selector).count() > 0:
@@ -229,6 +313,10 @@ def _parse_sent_at(raw_value, local_tz):
return parsed.astimezone(local_tz)
def _manual_run_failed_only():
return _is_manual_run() and os.getenv("SPARKFLOW_MANUAL_FAILED_ONLY") == "1"
def _target_sent_today(user, target_name, now):
history = dict(user.get("message_history") or {})
entry = history.get(target_name) or {}
@@ -236,6 +324,24 @@ def _target_sent_today(user, target_name, now):
return bool(sent_at and sent_at.date() == now.date())
def _target_failed_today(user, target_name, now):
queue = dict(user.get("failure_queue") or {})
entry = queue.get(target_name) or {}
last_attempt_at = _parse_sent_at(entry.get("lastAttemptAt"), now.tzinfo)
return bool(last_attempt_at and last_attempt_at.date() == now.date())
def _pending_failed_targets(user, now):
queue = dict(user.get("failure_queue") or {})
targets = []
for target_name in user.get("targets") or []:
if _target_sent_today(user, target_name, now):
continue
if target_name in queue and _target_failed_today(user, target_name, now):
targets.append(target_name)
return targets
def _scheduled_send_time(user, target_name, send_window, now):
window_minutes = (send_window["endHour"] - send_window["startHour"]) * 60
start_of_window = now.replace(
@@ -253,7 +359,7 @@ def _scheduled_send_time(user, target_name, send_window, now):
def _select_due_targets(user, send_window, now):
targets = list(user.get("targets") or [])
if not send_window.get("enabled") or _is_manual_run():
return targets, [], []
return targets, [], [], []
window_start = now.replace(
hour=send_window["startHour"],
@@ -268,24 +374,51 @@ def _select_due_targets(user, send_window, now):
microsecond=0,
)
if now < window_start or now > window_end:
return [], [], [(target, _scheduled_send_time(user, target, send_window, now)) for target in targets]
return [], [], [(target, _scheduled_send_time(user, target, send_window, now)) for target in targets], []
due_targets = []
already_sent = []
pending_targets = []
queued_failures = []
for target_name in targets:
if _target_sent_today(user, target_name, now):
already_sent.append(target_name)
continue
if _target_failed_today(user, target_name, now):
queued_failures.append(target_name)
continue
scheduled_at = _scheduled_send_time(user, target_name, send_window, now)
if now >= scheduled_at:
due_targets.append(target_name)
else:
pending_targets.append((target_name, scheduled_at))
return due_targets, already_sent, pending_targets
return due_targets, already_sent, pending_targets, queued_failures
def _prepare_active_users_for_run(active_config, active_user_data):
schedule_tz = _schedule_timezone()
now = datetime.now(schedule_tz)
if _manual_run_failed_only():
logger.info("SPARKFLOW_MANUAL_RUN=1, retrying queued failures only")
runnable_users = []
for user in active_user_data:
retry_targets = _pending_failed_targets(user, now)
already_sent = [target for target in user.get("targets") or [] if _target_sent_today(user, target, now)]
logger.info(
"manual-retry user=%s retryTargets=%s alreadySentToday=%s",
user.get("username", "unknown"),
retry_targets,
already_sent,
)
if retry_targets:
runnable_user = dict(user)
runnable_user["targets"] = retry_targets
runnable_users.append(runnable_user)
if not runnable_users:
logger.info("No queued failures are pending for manual retry")
return runnable_users
if _is_manual_run():
logger.info("SPARKFLOW_MANUAL_RUN=1, bypassing daily send window")
return [dict(user, targets=list(user.get("targets") or [])) for user in active_user_data]
@@ -294,8 +427,6 @@ def _prepare_active_users_for_run(active_config, active_user_data):
if not send_window.get("enabled"):
return [dict(user, targets=list(user.get("targets") or [])) for user in active_user_data]
schedule_tz = _schedule_timezone()
now = datetime.now(schedule_tz)
logger.info(
"dailySendWindow enabled startHour=%s endHour=%s intervalMinutes=%s timezone=%s now=%s",
send_window["startHour"],
@@ -307,17 +438,18 @@ def _prepare_active_users_for_run(active_config, active_user_data):
runnable_users = []
for user in active_user_data:
due_targets, already_sent, pending_targets = _select_due_targets(user, send_window, now)
due_targets, already_sent, pending_targets, queued_failures = _select_due_targets(user, send_window, now)
pending_preview = [
f"{target_name}@{scheduled_at.strftime('%H:%M')}"
for target_name, scheduled_at in pending_targets[:5]
]
logger.info(
"windowed user=%s dueTargets=%s alreadySentToday=%s pendingTargets=%s",
"windowed user=%s dueTargets=%s alreadySentToday=%s pendingTargets=%s queuedFailures=%s",
user.get("username", "unknown"),
due_targets,
already_sent,
pending_preview,
queued_failures,
)
if due_targets:
runnable_user = dict(user)
@@ -343,29 +475,67 @@ def _account_match_tokens(user):
return tokens
def _persist_browser_send_success(user, target_name, message, sent_at):
def _find_matching_account(accounts, user):
target_username = str(user.get("username") or "").strip()
target_unique_id = normalize_unique_id(user.get("unique_id"))
if not target_username and not target_unique_id:
logger.warning("Cannot persist browser send history without account identity for target=%s", target_name)
return
return None
accounts = get_userData(force_reload=True)
matched_account = None
for account in accounts:
account_username = str(account.get("username") or "").strip()
account_unique_id = normalize_unique_id(account.get("unique_id"))
if target_unique_id and account_unique_id == target_unique_id:
matched_account = account
break
return account
if target_username and account_username == target_username:
matched_account = account
break
return account
return None
def _persist_browser_send_failure(user, target_name, message, category, reason, attempted_at):
accounts = get_userData(force_reload=True)
matched_account = _find_matching_account(accounts, user)
if matched_account is None:
logger.warning(
"Could not find account to persist browser send failure for user=%s target=%s",
user.get("username", "unknown"),
target_name,
)
return
queue = dict(matched_account.get("failure_queue") or {})
existing_entry = dict(queue.get(target_name) or {})
queue[target_name] = {
"category": category,
"reason": reason,
"message": message,
"firstAttemptAt": existing_entry.get("firstAttemptAt") or attempted_at,
"lastAttemptAt": attempted_at,
"attemptCount": int(existing_entry.get("attemptCount") or 0) + 1,
"lastRunMode": _current_run_mode(),
}
matched_account["failure_queue"] = queue
save_userData(accounts)
user_queue = dict(user.get("failure_queue") or {})
user_queue[target_name] = dict(queue[target_name])
user["failure_queue"] = user_queue
logger.warning(
"Queued failed browser send for %s/%s category=%s reason=%s",
matched_account.get("username", "unknown"),
target_name,
category,
reason,
)
def _persist_browser_send_success(user, target_name, message, sent_at):
accounts = get_userData(force_reload=True)
matched_account = _find_matching_account(accounts, user)
if matched_account is None:
logger.warning(
"Could not find account to persist browser send history for user=%s target=%s",
target_username or target_unique_id or "unknown",
user.get("username", "unknown"),
target_name,
)
return
@@ -376,6 +546,12 @@ def _persist_browser_send_success(user, target_name, message, sent_at):
"sentAt": sent_at,
}
matched_account["message_history"] = history
queue = dict(matched_account.get("failure_queue") or {})
queue.pop(target_name, None)
if queue:
matched_account["failure_queue"] = queue
else:
matched_account.pop("failure_queue", None)
save_userData(accounts)
user_history = dict(user.get("message_history") or {})
@@ -384,6 +560,12 @@ def _persist_browser_send_success(user, target_name, message, sent_at):
"sentAt": sent_at,
}
user["message_history"] = user_history
user_queue = dict(user.get("failure_queue") or {})
user_queue.pop(target_name, None)
if user_queue:
user["failure_queue"] = user_queue
else:
user.pop("failure_queue", None)
logger.info(
"Persisted browser send history for %s/%s at %s",
@@ -441,63 +623,125 @@ async def do_user_task(browser, user, semaphore):
context = await browser.new_context()
context.set_default_navigation_timeout(120000)
context.set_default_timeout(120000)
yielded_targets = set()
try:
page = await context.new_page()
await retry_operation(
"open creator home",
page.goto,
retries=3,
delay=5,
url="https://creator.douyin.com/",
)
await context.add_cookies(cookies)
await retry_operation(
"open chat page",
page.goto,
retries=3,
delay=5,
url="https://creator.douyin.com/creator-micro/data/following/chat",
)
try:
await retry_operation(
"open creator home",
page.goto,
retries=3,
delay=5,
url="https://creator.douyin.com/",
)
await context.add_cookies(cookies)
await retry_operation(
"open chat page",
page.goto,
retries=3,
delay=5,
url="https://creator.douyin.com/creator-micro/data/following/chat",
)
except Exception as exc:
attempted_at = datetime.now(timezone.utc).isoformat(timespec="seconds")
category = classify_browser_failure("open_chat_page", exc)
reason = str(exc)
logger.exception("Account %s failed before target delivery", account_name)
for target_name in targets:
_persist_browser_send_failure(user, target_name, "", category, reason, attempted_at)
return
logger.info("Account %s started the message flow", account_name)
async for target_name in scroll_and_select_user(page, account_name, targets):
try:
await save_debug_artifacts(page, account_name, target_name, "selected-friend")
chat_input, selector_used = await locate_chat_input(page)
logger.info("Using chat input selector %s for %s/%s", selector_used, account_name, target_name)
try:
async for target_name in scroll_and_select_user(page, account_name, targets):
yielded_targets.add(target_name)
message = ""
chat_input = None
try:
await save_debug_artifacts(page, account_name, target_name, "selected-friend")
chat_input, selector_used = await locate_chat_input(page)
logger.info("Using chat input selector %s for %s/%s", selector_used, account_name, target_name)
message = build_message()
logger.info("Prepared message for %s/%s: %r", account_name, target_name, message)
message = build_message()
logger.info("Prepared message for %s/%s: %r", account_name, target_name, message)
lines = message.split("\n")
for index, line in enumerate(lines):
await chat_input.type(line, delay=50)
if index < len(lines) - 1:
await chat_input.press("Shift+Enter")
lines = message.split("\n")
for index, line in enumerate(lines):
await chat_input.type(line, delay=50)
if index < len(lines) - 1:
await chat_input.press("Shift+Enter")
await save_debug_artifacts(page, account_name, target_name, "typed-message")
await save_debug_artifacts(page, account_name, target_name, "typed-message")
logger.info("Pressing Enter to send message for %s/%s", account_name, target_name)
await chat_input.press("Enter")
logger.info("Pressing Enter to send message for %s/%s", account_name, target_name)
await chat_input.press("Enter")
sent_ok, detail = await confirm_message_sent(page, chat_input, message)
await save_debug_artifacts(page, account_name, target_name, "after-send")
sent_ok, detail = await confirm_message_sent(page, chat_input, message)
await save_debug_artifacts(page, account_name, target_name, "after-send")
if not sent_ok:
raise RuntimeError(detail)
if not sent_ok:
raise RuntimeError(detail)
logger.info("Message send confirmed for %s/%s: %s", account_name, target_name, detail)
_persist_browser_send_success(
logger.info("Message send confirmed for %s/%s: %s", account_name, target_name, detail)
_persist_browser_send_success(
user,
target_name,
message,
datetime.now(timezone.utc).isoformat(timespec="seconds"),
)
except Exception as exc:
sent_ok = False
detail = ""
if message:
sent_ok, detail = await detect_message_already_sent(page, chat_input, message)
if sent_ok:
logger.warning(
"Recovered send outcome for %s/%s after failure: %s",
account_name,
target_name,
detail,
)
_persist_browser_send_success(
user,
target_name,
message,
datetime.now(timezone.utc).isoformat(timespec="seconds"),
)
continue
logger.exception("Send flow failed for %s/%s", account_name, target_name)
await save_debug_artifacts(page, account_name, target_name, "send-error")
_persist_browser_send_failure(
user,
target_name,
message,
classify_browser_failure("send_flow", exc),
str(exc),
datetime.now(timezone.utc).isoformat(timespec="seconds"),
)
except Exception as exc:
remaining_targets = [target for target in targets if target not in yielded_targets]
attempted_at = datetime.now(timezone.utc).isoformat(timespec="seconds")
category = classify_browser_failure("friend_list", exc)
reason = str(exc)
logger.exception("Target selection failed for %s", account_name)
for target_name in remaining_targets:
_persist_browser_send_failure(user, target_name, "", category, reason, attempted_at)
return
missing_targets = [target for target in targets if target not in yielded_targets]
if missing_targets:
attempted_at = datetime.now(timezone.utc).isoformat(timespec="seconds")
for target_name in missing_targets:
_persist_browser_send_failure(
user,
target_name,
message,
datetime.now(timezone.utc).isoformat(timespec="seconds"),
"",
"friend_not_found",
"target not selected from friend list",
attempted_at,
)
except Exception:
logger.exception("Send flow failed for %s/%s", account_name, target_name)
await save_debug_artifacts(page, account_name, target_name, "send-error")
raise
finally:
await context.close()

View File

@@ -1,7 +1,7 @@
import json
import logging
import traceback
from datetime import datetime
from datetime import datetime, timedelta, timezone
from pathlib import Path
import urllib.error
import urllib.request
@@ -16,6 +16,7 @@ from starlette.middleware.sessions import SessionMiddleware
logger = logging.getLogger(__name__)
from core.friends import fetch_account_friends
from core.tasks import run_browser_tasks
from utils.config import (
get_app_settings,
get_config,
@@ -96,6 +97,31 @@ def coerce_int(value, default, minimum=0):
return max(minimum, int(default))
def _schedule_timezone():
return timezone(timedelta(hours=8), name="Asia/Shanghai")
def _parse_sent_at(raw_value):
if not raw_value:
return None
raw = str(raw_value).strip()
if raw.endswith("Z"):
raw = raw[:-1] + "+00:00"
try:
parsed = datetime.fromisoformat(raw)
except ValueError:
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=_schedule_timezone())
return parsed.astimezone(_schedule_timezone())
def _target_sent_today(account, target_name):
entry = dict(account.get("message_history") or {}).get(target_name) or {}
sent_at = _parse_sent_at(entry.get("sentAt"))
return bool(sent_at and sent_at.date() == datetime.now(_schedule_timezone()).date())
def login_desktop_api_url():
settings = get_app_settings(force_reload=True)
return str(settings.get("login_desktop_api_url") or "http://127.0.0.1:18090").rstrip("/")
@@ -278,6 +304,21 @@ def create_app():
},
)
@app.get("/ops/send-console", response_class=HTMLResponse)
async def send_console_page(request: Request):
maybe_redirect = require_user(request)
if maybe_redirect:
return maybe_redirect
return render_template(
request,
"send_console.html",
{
"flash": pop_flash(request),
"ops": get_ops_snapshot(),
},
)
@app.post("/accounts/{unique_id}/update")
async def update_account(request: Request, unique_id: str):
maybe_redirect = require_user(request)
@@ -378,6 +419,47 @@ def create_app():
flash(request, "Account not found.", "error")
return redirect("/")
@app.post("/accounts/{unique_id}/retry-target")
async def retry_account_target(request: Request, unique_id: str):
maybe_redirect = require_user(request)
if maybe_redirect:
return maybe_redirect
form = await request.form()
if not validate_csrf(request, str(form.get("csrf_token", ""))):
return Response("Invalid CSRF token", status_code=403)
target_name = str(form.get("target", "")).strip()
if not target_name:
flash(request, "Target is required for retry.", "error")
return redirect("/ops/send-console")
accounts = get_userData(force_reload=True)
account = find_account(accounts, unique_id)
if not account:
flash(request, "Account not found.", "error")
return redirect("/ops/send-console")
account_copy = dict(account)
account_copy["targets"] = [target_name]
config = get_config(force_reload=True)
config["taskCount"] = 1
try:
await run_browser_tasks(config, [account_copy])
except Exception as exc:
flash(request, f"Retry failed for {account.get('username', 'Account')} / {target_name}: {exc}", "error")
return redirect("/ops/send-console")
updated_account = find_account(get_userData(force_reload=True), unique_id) or {}
if _target_sent_today(updated_account, target_name):
flash(request, f"Retried {account.get('username', 'Account')} / {target_name} successfully.", "success")
else:
failure_entry = dict(updated_account.get("failure_queue") or {}).get(target_name) or {}
reason = str(failure_entry.get("reason") or "Retry did not confirm a successful send.")
flash(request, f"Retry did not succeed for {account.get('username', 'Account')} / {target_name}: {reason}", "error")
return redirect("/ops/send-console")
@app.post("/config")
async def save_runtime_config(request: Request):
maybe_redirect = require_user(request)
@@ -493,9 +575,9 @@ def create_app():
pid = run_task_now()
if pid == -1:
flash(request, "Task launch failed. Check console logs for Missing Docker or protected log_file path.", "error")
flash(request, "Failed to start failed-target retry run. Check server logs for details.", "error")
else:
flash(request, f"Triggered a background task run (pid {pid}).", "success")
flash(request, f"Triggered a failed-target retry run in the background (pid {pid}).", "success")
return redirect("/")
@app.post("/ops/proxy/refresh")

View File

@@ -1,13 +1,16 @@
import json
import hashlib
import logging
import os
import re
import shlex
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from zoneinfo import ZoneInfo
from utils.config import get_app_settings, get_config, repo_root, save_config
from utils.config import get_app_settings, get_config, get_userData, normalize_unique_id, repo_root, save_config
logger = logging.getLogger(__name__)
@@ -193,6 +196,7 @@ def run_task_now():
cwd=cwd,
env={
"SPARKFLOW_MANUAL_RUN": "1",
"SPARKFLOW_MANUAL_FAILED_ONLY": "1",
"PYTHONUNBUFFERED": "1",
},
)
@@ -375,6 +379,172 @@ def current_daily_schedule():
return ""
def _schedule_timezone():
timezone_name = (
str(os.getenv("SPARKFLOW_TIMEZONE") or "").strip()
or str(os.getenv("TZ") or "").strip()
or "Asia/Shanghai"
)
try:
return ZoneInfo(timezone_name)
except Exception:
if timezone_name == "Asia/Shanghai":
return timezone(timedelta(hours=8), name="Asia/Shanghai")
return datetime.now().astimezone().tzinfo
def _normalize_send_window():
raw = dict(get_config(force_reload=True).get("dailySendWindow") or {})
return {
"enabled": bool(raw.get("enabled", False)),
"startHour": int(raw.get("startHour", 10)),
"endHour": int(raw.get("endHour", 18)),
"scheduleIntervalMinutes": max(1, int(raw.get("scheduleIntervalMinutes", 10))),
}
def _parse_sent_at(raw_value, local_tz):
if not raw_value:
return None
raw = str(raw_value).strip()
if raw.endswith("Z"):
raw = raw[:-1] + "+00:00"
try:
parsed = datetime.fromisoformat(raw)
except ValueError:
return None
if parsed.tzinfo is None:
return parsed.replace(tzinfo=local_tz)
return parsed.astimezone(local_tz)
def _account_identity(user):
return str(user.get("unique_id") or user.get("username") or "unknown").strip()
def _scheduled_send_time(user, target_name, send_window, now):
window_minutes = max(1, (send_window["endHour"] - send_window["startHour"]) * 60)
start_of_window = now.replace(
hour=send_window["startHour"],
minute=0,
second=0,
microsecond=0,
)
seed = f"{now.date().isoformat()}|{_account_identity(user)}|{target_name}"
digest = hashlib.sha256(seed.encode("utf-8")).digest()
offset_minutes = int.from_bytes(digest[:8], "big") % window_minutes
return start_of_window + timedelta(minutes=offset_minutes)
def _build_target_status(account, target_name, now, send_window):
history = dict(account.get("message_history") or {})
failure_queue = dict(account.get("failure_queue") or {})
history_entry = history.get(target_name) or {}
sent_at = _parse_sent_at(history_entry.get("sentAt"), now.tzinfo)
if sent_at and sent_at.date() == now.date():
return {
"target": target_name,
"status": "sent",
"message": str(history_entry.get("message") or ""),
"sentAt": sent_at.isoformat(timespec="seconds"),
"lastAttemptAt": "",
"category": "",
"reason": "",
"attemptCount": 0,
"scheduledAt": "",
}
failure_entry = failure_queue.get(target_name) or {}
last_attempt_at = _parse_sent_at(failure_entry.get("lastAttemptAt"), now.tzinfo)
if last_attempt_at and last_attempt_at.date() == now.date():
return {
"target": target_name,
"status": "failed",
"message": str(failure_entry.get("message") or ""),
"sentAt": "",
"lastAttemptAt": last_attempt_at.isoformat(timespec="seconds"),
"category": str(failure_entry.get("category") or ""),
"reason": str(failure_entry.get("reason") or ""),
"attemptCount": int(failure_entry.get("attemptCount") or 0),
"scheduledAt": "",
}
scheduled_at = None
if send_window.get("enabled"):
scheduled_at = _scheduled_send_time(account, target_name, send_window, now)
if scheduled_at > now:
return {
"target": target_name,
"status": "pending",
"message": "",
"sentAt": "",
"lastAttemptAt": "",
"category": "",
"reason": "",
"attemptCount": 0,
"scheduledAt": scheduled_at.isoformat(timespec="seconds"),
}
return {
"target": target_name,
"status": "unprocessed",
"message": "",
"sentAt": "",
"lastAttemptAt": "",
"category": "",
"reason": "",
"attemptCount": 0,
"scheduledAt": scheduled_at.isoformat(timespec="seconds") if scheduled_at else "",
}
def get_send_console_snapshot():
accounts = [account for account in get_userData(force_reload=True) if account.get("enabled", True)]
send_window = _normalize_send_window()
now = datetime.now(_schedule_timezone())
summary = {
"enabled_accounts": len(accounts),
"today_sent_targets": 0,
"today_failed_targets": 0,
"today_pending_targets": 0,
"today_unprocessed_targets": 0,
}
account_rows = []
for account in accounts:
statuses = [_build_target_status(account, target_name, now, send_window) for target_name in account.get("targets") or []]
sent_targets = [item for item in statuses if item["status"] == "sent"]
failed_targets = [item for item in statuses if item["status"] == "failed"]
pending_targets = [item for item in statuses if item["status"] == "pending"]
unprocessed_targets = [item for item in statuses if item["status"] == "unprocessed"]
summary["today_sent_targets"] += len(sent_targets)
summary["today_failed_targets"] += len(failed_targets)
summary["today_pending_targets"] += len(pending_targets)
summary["today_unprocessed_targets"] += len(unprocessed_targets)
account_rows.append(
{
"unique_id": str(account.get("unique_id") or ""),
"username": account.get("username") or "",
"sent_targets": sent_targets,
"failed_targets": failed_targets,
"pending_targets": pending_targets,
"unprocessed_targets": unprocessed_targets,
"last_failure_reason": failed_targets[0]["reason"] if failed_targets else "",
"failure_queue": dict(account.get("failure_queue") or {}),
}
)
return {
"now": now.isoformat(timespec="seconds"),
"summary": summary,
"accounts": account_rows,
}
def _check_image_present():
"""Return True if the douyin-sparkflow:local image exists."""
try:
@@ -400,6 +570,7 @@ def get_ops_snapshot():
"compose_file": str(compose_file_path() or ""),
"containers": get_container_status(),
"task_containers": get_task_container_rows(),
"send_console": get_send_console_snapshot(),
"daily_schedule": current_daily_schedule(),
"crontab": read_crontab(),
"log_tail": read_log_tail(120),

View File

@@ -141,6 +141,74 @@
</div>
</section>
<section class="panel" id="send-console-summary">
<div class="section-title-row">
<div>
<h2>发送控制台摘要</h2>
<p class="muted compact">展示今天的成功、失败、待发送与待补发状态。失败目标可在详情页单独重试。</p>
</div>
<a class="link-button" href="/ops/send-console">打开发送控制台</a>
</div>
<div class="stats-grid" style="grid-template-columns: repeat(4, minmax(0, 1fr));">
<article class="stat-card">
<div class="stat-meta">
<span class="stat-label">启用账号</span>
<strong class="stat-value">{{ ops.send_console.summary.enabled_accounts }}</strong>
</div>
<div class="stat-icon blue">A</div>
</article>
<article class="stat-card">
<div class="stat-meta">
<span class="stat-label">今日成功目标</span>
<strong class="stat-value">{{ ops.send_console.summary.today_sent_targets }}</strong>
</div>
<div class="stat-icon green">OK</div>
</article>
<article class="stat-card">
<div class="stat-meta">
<span class="stat-label">失败待补发</span>
<strong class="stat-value">{{ ops.send_console.summary.today_failed_targets }}</strong>
</div>
<div class="stat-icon soft">!</div>
</article>
<article class="stat-card">
<div class="stat-meta">
<span class="stat-label">待发送 / 未处理</span>
<strong class="stat-value">{{ ops.send_console.summary.today_pending_targets + ops.send_console.summary.today_unprocessed_targets }}</strong>
</div>
<div class="stat-icon soft">...</div>
</article>
</div>
<div class="table-shell" style="margin-top: 16px;">
<table>
<thead>
<tr>
<th>账号</th>
<th>今日成功</th>
<th>失败待补发</th>
<th>待发送</th>
<th>未处理</th>
<th>最后失败原因</th>
</tr>
</thead>
<tbody>
{% for row in ops.send_console.accounts %}
<tr>
<td><strong>{{ row.username }}</strong><br><span class="muted">{{ row.unique_id }}</span></td>
<td>{{ row.sent_targets|length }}</td>
<td>{{ row.failed_targets|length }}</td>
<td>{{ row.pending_targets|length }}</td>
<td>{{ row.unprocessed_targets|length }}</td>
<td>{{ row.last_failure_reason or "-" }}</td>
</tr>
{% else %}
<tr><td colspan="6">暂无发送状态摘要。</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</section>
<section class="panel" id="account-management">
<div class="section-title-row">
<div>
@@ -437,6 +505,21 @@
{% block scripts %}
<script>
(() => {
const runNowForm = document.querySelector('form[action="/ops/run-now"]');
if (!runNowForm) return;
const button = runNowForm.querySelector('button[type="submit"]');
if (button) {
button.textContent = "补发全部失败项";
}
if (!runNowForm.nextElementSibling || !runNowForm.nextElementSibling.classList.contains("failed-only-note")) {
const note = document.createElement("p");
note.className = "muted compact failed-only-note";
note.textContent = "“补发全部失败项”只处理今天 failure_queue 中的目标,不会全量重发全部账号。";
runNowForm.insertAdjacentElement("afterend", note);
}
})();
(() => {
const root = document.getElementById("login-desktop-controls");
if (!root) return;

View File

@@ -0,0 +1,185 @@
{% extends "base.html" %}
{% block title %}发送控制台{% endblock %}
{% block page_title %}发送控制台{% endblock %}
{% block content %}
{% set send_console = ops.send_console %}
<div class="stack">
<section class="panel">
<div class="section-title-row">
<div>
<h2>今日发送总览</h2>
<p class="muted compact">当前时间:{{ send_console.now }}。本页按账号展示今天已成功、失败待补发、待发送和未处理目标。</p>
</div>
<div class="button-row two" style="width: 320px;">
<a class="ghost-button" href="/">返回首页</a>
<form method="post" action="/ops/run-now">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<button type="submit">补发全部失败项</button>
</form>
</div>
</div>
<div class="stats-grid">
<article class="stat-card">
<div class="stat-meta">
<span class="stat-label">启用账号</span>
<strong class="stat-value">{{ send_console.summary.enabled_accounts }}</strong>
</div>
<div class="stat-icon blue">A</div>
</article>
<article class="stat-card">
<div class="stat-meta">
<span class="stat-label">今日成功目标</span>
<strong class="stat-value">{{ send_console.summary.today_sent_targets }}</strong>
</div>
<div class="stat-icon green">OK</div>
</article>
<article class="stat-card">
<div class="stat-meta">
<span class="stat-label">失败待补发</span>
<strong class="stat-value">{{ send_console.summary.today_failed_targets }}</strong>
</div>
<div class="stat-icon soft">!</div>
</article>
<article class="stat-card">
<div class="stat-meta">
<span class="stat-label">待发送 / 未处理</span>
<strong class="stat-value">{{ send_console.summary.today_pending_targets + send_console.summary.today_unprocessed_targets }}</strong>
</div>
<div class="stat-icon soft">...</div>
</article>
</div>
</section>
{% for account in send_console.accounts %}
<section class="panel">
<div class="section-title-row">
<div>
<h2>{{ account.username }}</h2>
<p class="muted compact">unique_id: {{ account.unique_id }}</p>
</div>
<div class="status-line">
<span class="pill soft">成功 {{ account.sent_targets|length }}</span>
<span class="pill {% if account.failed_targets %}danger{% else %}soft{% endif %}">失败 {{ account.failed_targets|length }}</span>
<span class="pill">待发送 {{ account.pending_targets|length }}</span>
<span class="pill warning">未处理 {{ account.unprocessed_targets|length }}</span>
</div>
</div>
<div class="layout-grid" style="grid-template-columns: repeat(2, minmax(0, 1fr));">
<section class="panel" style="box-shadow:none; margin:0; padding:14px;">
<h3>今日已成功</h3>
<div class="table-shell" style="margin-top: 12px;">
<table>
<thead>
<tr>
<th>目标</th>
<th>消息</th>
<th>sentAt</th>
</tr>
</thead>
<tbody>
{% for item in account.sent_targets %}
<tr>
<td>{{ item.target }}</td>
<td>{{ item.message or "-" }}</td>
<td>{{ item.sentAt or "-" }}</td>
</tr>
{% else %}
<tr><td colspan="3">暂无今日成功目标。</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</section>
<section class="panel" style="box-shadow:none; margin:0; padding:14px;">
<h3>失败待补发</h3>
<div class="table-shell" style="margin-top: 12px;">
<table>
<thead>
<tr>
<th>目标</th>
<th>失败分类</th>
<th>失败原因</th>
<th>次数</th>
<th>操作</th>
</tr>
</thead>
<tbody>
{% for item in account.failed_targets %}
<tr>
<td>{{ item.target }}</td>
<td>{{ item.category or "-" }}</td>
<td>{{ item.reason or "-" }}</td>
<td>{{ item.attemptCount or 0 }}</td>
<td>
<form method="post" action="/accounts/{{ account.unique_id }}/retry-target">
<input type="hidden" name="csrf_token" value="{{ csrf_token }}">
<input type="hidden" name="target" value="{{ item.target }}">
<button type="submit" class="ghost-button">重试此目标</button>
</form>
</td>
</tr>
{% else %}
<tr><td colspan="5">暂无失败待补发目标。</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</section>
<section class="panel" style="box-shadow:none; margin:0; padding:14px;">
<h3>今日待发送</h3>
<div class="table-shell" style="margin-top: 12px;">
<table>
<thead>
<tr>
<th>目标</th>
<th>scheduledAt</th>
</tr>
</thead>
<tbody>
{% for item in account.pending_targets %}
<tr>
<td>{{ item.target }}</td>
<td>{{ item.scheduledAt or "-" }}</td>
</tr>
{% else %}
<tr><td colspan="2">暂无今日待发送目标。</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</section>
<section class="panel" style="box-shadow:none; margin:0; padding:14px;">
<h3>今日未处理</h3>
<div class="table-shell" style="margin-top: 12px;">
<table>
<thead>
<tr>
<th>目标</th>
<th>scheduledAt</th>
</tr>
</thead>
<tbody>
{% for item in account.unprocessed_targets %}
<tr>
<td>{{ item.target }}</td>
<td>{{ item.scheduledAt or "-" }}</td>
</tr>
{% else %}
<tr><td colspan="2">暂无未处理目标。</td></tr>
{% endfor %}
</tbody>
</table>
</div>
</section>
</div>
</section>
{% endfor %}
</div>
{% endblock %}