diff --git a/crates/bili_sync/src/api/response.rs b/crates/bili_sync/src/api/response.rs index ccf748a..34600d2 100644 --- a/crates/bili_sync/src/api/response.rs +++ b/crates/bili_sync/src/api/response.rs @@ -93,6 +93,9 @@ pub struct PageInfo { pub name: String, #[serde(serialize_with = "serde_page_download_status")] pub download_status: u32, + pub danmaku_last_synced_at: Option, + pub danmaku_sync_generation: u32, + pub danmaku_cid_snapshot: Option, } #[derive(Serialize, DerivePartialModel, FromQueryResult, Clone, Copy)] diff --git a/crates/bili_sync/src/api/routes/videos/mod.rs b/crates/bili_sync/src/api/routes/videos/mod.rs index 5602be6..df4e05b 100644 --- a/crates/bili_sync/src/api/routes/videos/mod.rs +++ b/crates/bili_sync/src/api/routes/videos/mod.rs @@ -1,10 +1,12 @@ use std::collections::HashSet; +use std::sync::Arc; use anyhow::{Context, Result}; use axum::extract::{Extension, Path, Query}; use axum::routing::{get, post}; use axum::{Json, Router}; use bili_sync_entity::*; +use serde::Serialize; use sea_orm::ActiveValue::Set; use sea_orm::{ ActiveModelTrait, ColumnTrait, DatabaseConnection, EntityTrait, IntoActiveModel, PaginatorTrait, QueryFilter, @@ -23,7 +25,10 @@ use crate::api::response::{ VideosResponse, }; use crate::api::wrapper::{ApiError, ApiResponse, ValidatedJson}; +use crate::bilibili::BiliClient; +use crate::config::VersionedConfig; use crate::utils::status::{PageStatus, VideoStatus}; +use crate::workflow_danmaku::{refresh_danmaku_for_page, refresh_danmaku_for_video}; pub(super) fn router() -> Router { Router::new() @@ -37,6 +42,36 @@ pub(super) fn router() -> Router { .route("/videos/{id}/update-status", post(update_video_status)) .route("/videos/reset-status", post(reset_filtered_video_status)) .route("/videos/update-status", post(update_filtered_video_status)) + .route("/videos/{id}/refresh-danmaku", post(refresh_video_danmaku)) + .route("/pages/{id}/refresh-danmaku", post(refresh_page_danmaku)) +} + +#[derive(Serialize)] +pub struct RefreshDanmakuResponse { + /// 本次实际刷新成功的 page 数量;page 级接口始终为 0 或 1。 + pub refreshed: usize, +} + +/// 手动触发:刷新某个视频所有 page 的弹幕。忽略策略,强制执行。 +pub async fn refresh_video_danmaku( + Path(id): Path, + Extension(db): Extension, + Extension(bili_client): Extension>, +) -> Result, ApiError> { + let config = VersionedConfig::get().snapshot(); + let refreshed = refresh_danmaku_for_video(id, &bili_client, &db, &config).await?; + Ok(ApiResponse::ok(RefreshDanmakuResponse { refreshed })) +} + +/// 手动触发:刷新单个 page 的弹幕。忽略策略,强制执行;走严格模式,任何错误都直接 4xx/5xx。 +pub async fn refresh_page_danmaku( + Path(id): Path, + Extension(db): Extension, + Extension(bili_client): Extension>, +) -> Result, ApiError> { + let config = VersionedConfig::get().snapshot(); + let refreshed = refresh_danmaku_for_page(id, &bili_client, &db, &config).await?; + Ok(ApiResponse::ok(RefreshDanmakuResponse { refreshed })) } /// 列出视频的基本信息,支持根据视频来源筛选、名称查找和分页 diff --git a/crates/bili_sync/src/config/current.rs b/crates/bili_sync/src/config/current.rs index cf7b8f1..0ec1c3c 100644 --- a/crates/bili_sync/src/config/current.rs +++ b/crates/bili_sync/src/config/current.rs @@ -14,7 +14,7 @@ use crate::config::default::{ default_auth_token, default_bind_address, default_collection_path, default_favorite_path, default_submission_path, default_time_format, }; -use crate::config::item::{ConcurrentLimit, NFOTimeType, SkipOption, Trigger}; +use crate::config::item::{ConcurrentLimit, DanmakuUpdatePolicy, NFOTimeType, SkipOption, Trigger}; use crate::notifier::Notifier; use crate::utils::model::{load_db_config, save_db_config}; @@ -52,6 +52,8 @@ pub struct Config { pub cdn_sorting: bool, #[serde(default)] pub try_upower_anyway: bool, + #[serde(default)] + pub danmaku_update_policy: DanmakuUpdatePolicy, pub version: u64, } @@ -105,6 +107,9 @@ impl Config { } } }; + if let Err(msg) = self.danmaku_update_policy.validate() { + errors.push(msg); + } if !errors.is_empty() { bail!(errors.into_iter().map(|e| format!("- {}", e)).join("\n")); } @@ -134,6 +139,7 @@ impl Default for Config { time_format: default_time_format(), cdn_sorting: false, try_upower_anyway: false, + danmaku_update_policy: DanmakuUpdatePolicy::default(), version: 0, } } diff --git a/crates/bili_sync/src/config/item.rs b/crates/bili_sync/src/config/item.rs index 9e582d2..1848aee 100644 --- a/crates/bili_sync/src/config/item.rs +++ b/crates/bili_sync/src/config/item.rs @@ -82,6 +82,65 @@ impl Default for Trigger { } } +/// 弹幕增量更新策略。 +/// +/// 采用三段式模型,符合弹幕密度随发布时间衰减的真实曲线: +/// - 新鲜期:发布后 `fresh_days` 天内,每 `fresh_interval_hours` 小时刷新一次。 +/// - 成熟期:新鲜期结束到 `mature_days` 天之间,每 `mature_interval_days` 天刷新一次。 +/// - 老化期:成熟期结束到 `cold_days` 天之间,每 `cold_interval_days` 天刷新一次。 +/// - 冷冻:超过 `cold_days` 后触发最后一次更新并冻结,之后不再自动刷新(手动触发仍可)。 +/// +/// 默认关闭,保持向后兼容;启用后首次下载成功即视为第一次同步。 +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct DanmakuUpdatePolicy { + pub enabled: bool, + pub fresh_days: u32, + pub fresh_interval_hours: u32, + pub mature_days: u32, + pub mature_interval_days: u32, + pub cold_days: u32, + pub cold_interval_days: u32, +} + +impl Default for DanmakuUpdatePolicy { + fn default() -> Self { + Self { + enabled: false, + fresh_days: 3, + fresh_interval_hours: 6, + mature_days: 30, + mature_interval_days: 3, + cold_days: 180, + cold_interval_days: 30, + } + } +} + +impl DanmakuUpdatePolicy { + /// 校验字段合法性:三段阈值需单调递增,时间间隔必须大于 0。 + pub fn validate(&self) -> Result<(), &'static str> { + if !self.enabled { + return Ok(()); + } + if self.fresh_days > self.mature_days { + return Err("fresh_days 不能大于 mature_days"); + } + if self.mature_days > self.cold_days { + return Err("mature_days 不能大于 cold_days"); + } + if self.fresh_interval_hours == 0 { + return Err("fresh_interval_hours 必须大于 0"); + } + if self.mature_interval_days == 0 { + return Err("mature_interval_days 必须大于 0"); + } + if self.cold_interval_days == 0 { + return Err("cold_interval_days 必须大于 0"); + } + Ok(()) + } +} + pub trait PathSafeTemplate { fn path_safe_register(&mut self, name: &'static str, template: impl Into) -> Result<()>; fn path_safe_render(&self, name: &'static str, data: &serde_json::Value) -> Result; diff --git a/crates/bili_sync/src/config/mod.rs b/crates/bili_sync/src/config/mod.rs index f0f43fc..fb4807f 100644 --- a/crates/bili_sync/src/config/mod.rs +++ b/crates/bili_sync/src/config/mod.rs @@ -2,7 +2,7 @@ mod args; mod current; mod default; mod handlebar; -mod item; +pub mod item; mod versioned_cache; mod versioned_config; diff --git a/crates/bili_sync/src/main.rs b/crates/bili_sync/src/main.rs index 6c2396b..2f396cd 100644 --- a/crates/bili_sync/src/main.rs +++ b/crates/bili_sync/src/main.rs @@ -12,6 +12,7 @@ mod notifier; mod task; mod utils; mod workflow; +mod workflow_danmaku; use std::collections::VecDeque; use std::fmt::Debug; diff --git a/crates/bili_sync/src/task/video_downloader.rs b/crates/bili_sync/src/task/video_downloader.rs index 8db6771..6710277 100644 --- a/crates/bili_sync/src/task/video_downloader.rs +++ b/crates/bili_sync/src/task/video_downloader.rs @@ -14,6 +14,7 @@ use crate::config::{ARGS, Config, TEMPLATE, Trigger, VersionedConfig}; use crate::utils::model::get_enabled_video_sources; use crate::utils::notify::error_and_notify; use crate::workflow::process_video_source; +use crate::workflow_danmaku::refresh_danmaku_incremental; static INSTANCE: OnceCell = OnceCell::const_new(); @@ -369,5 +370,13 @@ async fn download_video( } } } + // 主下载流程结束后,进行一次弹幕增量刷新扫描。策略未启用时内部直接返回,零开销。 + if let Err(e) = refresh_danmaku_incremental(&bili_client, connection, config).await { + error_and_notify( + config, + &bili_client, + format!("弹幕增量更新遇到错误:{:#}", e), + ); + } Ok(()) } diff --git a/crates/bili_sync/src/utils/danmaku_schedule.rs b/crates/bili_sync/src/utils/danmaku_schedule.rs new file mode 100644 index 0000000..14aba94 --- /dev/null +++ b/crates/bili_sync/src/utils/danmaku_schedule.rs @@ -0,0 +1,361 @@ +//! 弹幕增量更新的调度决策函数(纯函数,易测试)。 +//! +//! 依据发布时间(pubtime)和上次同步时间(last_synced),给出当前时刻是否应该 +//! 触发弹幕刷新的判决。策略参数来自 [`DanmakuUpdatePolicy`],采用三段式: +//! 新鲜期 -> 成熟期 -> 老化期 -> 冷冻。 + +use chrono::{DateTime, Duration, Utc}; + +use crate::config::item::DanmakuUpdatePolicy; + +/// 弹幕同步阶段(与数据库 `page.danmaku_sync_generation` 字段一一对应)。 +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Stage { + /// 从未同步(0)。 + Initial = 0, + /// 新鲜期(1),发布后 `fresh_days` 内。 + Fresh = 1, + /// 成熟期(2)。 + Mature = 2, + /// 老化期(3)。 + Cold = 3, + /// 冷冻(4),不再自动同步。 + Frozen = 4, +} + +impl Stage { + pub fn from_generation(g: u32) -> Self { + match g { + 0 => Stage::Initial, + 1 => Stage::Fresh, + 2 => Stage::Mature, + 3 => Stage::Cold, + _ => Stage::Frozen, + } + } + + pub fn as_generation(self) -> u32 { + self as u32 + } +} + +/// 决策结果。 +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Decision { + /// 无需同步。 + Skip, + /// 应同步,并将下一次写入的阶段推进到 `next_stage`。 + Sync { next_stage: Stage }, +} + +/// 仅根据视频年龄计算 page 应处于的阶段。 +/// +/// `allow_freeze` 控制是否允许返回 [`Stage::Frozen`]: +/// - 调度路径:传 `true`,超过 cold_days 的视频会被冻结(最后触发一次后不再自动同步)。 +/// - 手动触发路径:传 `false`,cap 在 [`Stage::Cold`],避免用户手动刷新后视频反而被冻结。 +pub fn stage_for_age( + policy: &DanmakuUpdatePolicy, + pubtime: DateTime, + now: DateTime, + allow_freeze: bool, +) -> Stage { + let age = now.signed_duration_since(pubtime).max(Duration::zero()); + let fresh_end = Duration::days(policy.fresh_days as i64); + let mature_end = Duration::days(policy.mature_days as i64); + let cold_end = Duration::days(policy.cold_days as i64); + if allow_freeze && age >= cold_end { + Stage::Frozen + } else if age < fresh_end { + Stage::Fresh + } else if age < mature_end { + Stage::Mature + } else { + Stage::Cold + } +} + +/// 判断某个 page 是否应该在当前时刻触发弹幕更新。 +/// +/// 语义说明: +/// - `generation` 表示**当前阶段**(`last_synced` 所处的阶段),如果 `generation` 已经是 `Frozen`, +/// 永远返回 `Skip`。 +/// - 根据当前时间相对 `pubtime` 的年龄,判断"本次应该处于哪个阶段"(target_stage); +/// - 结合 `last_synced` 和对应阶段的间隔,判断是否应该刷新; +/// - 若 target_stage 已经超过 cold_days,返回一次 `Sync { next_stage: Frozen }`(最后触发一次即冻结)。 +/// +/// 首次同步(`last_synced=None`):只要策略开启且未冻结,立即触发。 +pub fn should_sync_danmaku( + policy: &DanmakuUpdatePolicy, + pubtime: DateTime, + last_synced: Option>, + generation: u32, + now: DateTime, +) -> Decision { + if !policy.enabled { + return Decision::Skip; + } + + let current_stage = Stage::from_generation(generation); + if current_stage == Stage::Frozen { + return Decision::Skip; + } + + let target_stage = stage_for_age(policy, pubtime, now, true); + if target_stage == Stage::Frozen { + // 超过冷冻期:最后触发一次(无论之前是否同步过),之后置为 Frozen + return Decision::Sync { next_stage: Stage::Frozen }; + } + + let interval = match target_stage { + Stage::Fresh => Duration::hours(policy.fresh_interval_hours as i64), + Stage::Mature => Duration::days(policy.mature_interval_days as i64), + Stage::Cold => Duration::days(policy.cold_interval_days as i64), + // 上面已 early-return,理论不可达 + Stage::Initial | Stage::Frozen => Duration::zero(), + }; + + match last_synced { + // 从未同步过,立即触发 + None => Decision::Sync { next_stage: target_stage }, + Some(ts) => { + let since_last = now.signed_duration_since(ts); + // 阶段刚刚迁移(比如从新鲜期迈入成熟期),立即触发一次,把 generation 同步到新阶段 + if target_stage.as_generation() > current_stage.as_generation() { + return Decision::Sync { next_stage: target_stage }; + } + if since_last >= interval { + Decision::Sync { next_stage: target_stage } + } else { + Decision::Skip + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn policy() -> DanmakuUpdatePolicy { + DanmakuUpdatePolicy { + enabled: true, + fresh_days: 3, + fresh_interval_hours: 6, + mature_days: 30, + mature_interval_days: 3, + cold_days: 180, + cold_interval_days: 30, + } + } + + fn t(days: i64, hours: i64) -> DateTime { + DateTime::::from_timestamp(0, 0).unwrap() + Duration::days(days) + Duration::hours(hours) + } + + #[test] + fn disabled_always_skip() { + let mut p = policy(); + p.enabled = false; + let now = t(10, 0); + let pub_t = t(0, 0); + assert_eq!( + should_sync_danmaku(&p, pub_t, None, 0, now), + Decision::Skip + ); + } + + #[test] + fn first_time_fresh_triggers_immediately() { + let p = policy(); + let pub_t = t(0, 0); + let now = t(0, 1); // 发布 1 小时后,首次 + assert_eq!( + should_sync_danmaku(&p, pub_t, None, 0, now), + Decision::Sync { next_stage: Stage::Fresh } + ); + } + + #[test] + fn fresh_interval_not_elapsed_skips() { + let p = policy(); + let pub_t = t(0, 0); + let last = t(0, 2); + let now = t(0, 5); // 距上次 3 小时,不足 6 小时 + assert_eq!( + should_sync_danmaku(&p, pub_t, Some(last), Stage::Fresh.as_generation(), now), + Decision::Skip + ); + } + + #[test] + fn fresh_interval_elapsed_syncs() { + let p = policy(); + let pub_t = t(0, 0); + let last = t(0, 2); + let now = t(0, 9); // 距上次 7 小时,超过 6 小时 + assert_eq!( + should_sync_danmaku(&p, pub_t, Some(last), Stage::Fresh.as_generation(), now), + Decision::Sync { next_stage: Stage::Fresh } + ); + } + + #[test] + fn stage_transition_fresh_to_mature_triggers_once() { + // 上次在新鲜期同步(2h),现在已进入成熟期(第 5 天),即使成熟期的间隔(3 天)未到, + // 也应立即触发一次,推进 generation。 + let p = policy(); + let pub_t = t(0, 0); + let last = t(0, 2); + let now = t(5, 0); + assert_eq!( + should_sync_danmaku(&p, pub_t, Some(last), Stage::Fresh.as_generation(), now), + Decision::Sync { next_stage: Stage::Mature } + ); + } + + #[test] + fn mature_interval_not_elapsed_skips() { + let p = policy(); + let pub_t = t(0, 0); + let last = t(5, 0); // 成熟期开始就同步了一次 + let now = t(7, 0); // 过了 2 天,不足 3 天 + assert_eq!( + should_sync_danmaku(&p, pub_t, Some(last), Stage::Mature.as_generation(), now), + Decision::Skip + ); + } + + #[test] + fn mature_interval_elapsed_syncs() { + let p = policy(); + let pub_t = t(0, 0); + let last = t(5, 0); + let now = t(9, 0); // 过了 4 天 + assert_eq!( + should_sync_danmaku(&p, pub_t, Some(last), Stage::Mature.as_generation(), now), + Decision::Sync { next_stage: Stage::Mature } + ); + } + + #[test] + fn mature_to_cold_stage_transition() { + let p = policy(); + let pub_t = t(0, 0); + let last = t(15, 0); // 成熟期中段 + let now = t(35, 0); // 已进入老化期(>30 天) + assert_eq!( + should_sync_danmaku(&p, pub_t, Some(last), Stage::Mature.as_generation(), now), + Decision::Sync { next_stage: Stage::Cold } + ); + } + + #[test] + fn cold_interval_elapsed_syncs() { + let p = policy(); + let pub_t = t(0, 0); + let last = t(40, 0); + let now = t(80, 0); // 过了 40 天 + assert_eq!( + should_sync_danmaku(&p, pub_t, Some(last), Stage::Cold.as_generation(), now), + Decision::Sync { next_stage: Stage::Cold } + ); + } + + #[test] + fn exceeding_cold_days_final_sync_then_freeze() { + let p = policy(); + let pub_t = t(0, 0); + let last = t(100, 0); + let now = t(181, 0); // 超过 cold_days=180 + assert_eq!( + should_sync_danmaku(&p, pub_t, Some(last), Stage::Cold.as_generation(), now), + Decision::Sync { next_stage: Stage::Frozen } + ); + } + + #[test] + fn stage_for_age_classifies_correctly() { + let p = policy(); + let pub_t = t(0, 0); + // age=0 → Fresh + assert_eq!(stage_for_age(&p, pub_t, t(0, 1), true), Stage::Fresh); + // age=4 days → Mature + assert_eq!(stage_for_age(&p, pub_t, t(4, 0), true), Stage::Mature); + // age=40 days → Cold + assert_eq!(stage_for_age(&p, pub_t, t(40, 0), true), Stage::Cold); + // age=200 days, allow_freeze=true → Frozen + assert_eq!(stage_for_age(&p, pub_t, t(200, 0), true), Stage::Frozen); + // age=200 days, allow_freeze=false → Cold(手动模式不冻结) + assert_eq!(stage_for_age(&p, pub_t, t(200, 0), false), Stage::Cold); + } + + #[test] + fn frozen_stays_frozen() { + let p = policy(); + let pub_t = t(0, 0); + let last = t(181, 0); + let now = t(500, 0); + assert_eq!( + should_sync_danmaku(&p, pub_t, Some(last), Stage::Frozen.as_generation(), now), + Decision::Skip + ); + } + + #[test] + fn never_synced_old_video_final_sync() { + // 一个老视频第一次被纳入同步范围(age 已超 cold_days),应触发一次并直接冻结。 + let p = policy(); + let pub_t = t(0, 0); + let now = t(200, 0); + assert_eq!( + should_sync_danmaku(&p, pub_t, None, 0, now), + Decision::Sync { next_stage: Stage::Frozen } + ); + } + + #[test] + fn pubtime_in_future_clamps_to_zero_age() { + // 时钟偏差导致发布时间晚于当前时间:按 age=0 处理(新鲜期首次)。 + let p = policy(); + let pub_t = t(10, 0); + let now = t(5, 0); + assert_eq!( + should_sync_danmaku(&p, pub_t, None, 0, now), + Decision::Sync { next_stage: Stage::Fresh } + ); + } + + #[test] + fn policy_as_once_after_days_equivalent() { + // 方案 A:只触发一次然后冻结。通过把 fresh/mature 设为 0、cold_days=N 实现。 + let p = DanmakuUpdatePolicy { + enabled: true, + fresh_days: 0, + fresh_interval_hours: 1, + mature_days: 0, + mature_interval_days: 1, + cold_days: 7, + cold_interval_days: 999_999, // 实际不会触发 + }; + let pub_t = t(0, 0); + // 7 天内,cold_interval_days 极大,last_synced=None 首次必触发 + let now = t(3, 0); + assert_eq!( + should_sync_danmaku(&p, pub_t, None, 0, now), + Decision::Sync { next_stage: Stage::Cold } + ); + // 已同步过,间隔极大,跳过 + let last = t(3, 0); + let now2 = t(6, 0); + assert_eq!( + should_sync_danmaku(&p, pub_t, Some(last), Stage::Cold.as_generation(), now2), + Decision::Skip + ); + // 超过 7 天,最后一次 + 冻结 + let now3 = t(8, 0); + assert_eq!( + should_sync_danmaku(&p, pub_t, Some(last), Stage::Cold.as_generation(), now3), + Decision::Sync { next_stage: Stage::Frozen } + ); + } +} diff --git a/crates/bili_sync/src/utils/mod.rs b/crates/bili_sync/src/utils/mod.rs index a308d29..3ceb713 100644 --- a/crates/bili_sync/src/utils/mod.rs +++ b/crates/bili_sync/src/utils/mod.rs @@ -1,4 +1,5 @@ pub mod convert; +pub mod danmaku_schedule; pub mod download_context; pub mod filenamify; pub mod format_arg; diff --git a/crates/bili_sync/src/utils/model.rs b/crates/bili_sync/src/utils/model.rs index 43804cc..f043a0a 100644 --- a/crates/bili_sync/src/utils/model.rs +++ b/crates/bili_sync/src/utils/model.rs @@ -103,11 +103,20 @@ pub async fn update_videos_model(videos: Vec, connection: &D Ok(()) } -/// 更新视频页 model 的下载状态 +/// 更新视频页 model 的下载状态。 +/// +/// 弹幕同步的三个字段(`danmaku_last_synced_at` / `danmaku_sync_generation` / `danmaku_cid_snapshot`) +/// 也在此处一并写回:首次下载完成后由 `download_page` 填值,避免被随后的弹幕增量扫描误判为"从未同步"。 pub async fn update_pages_model(pages: Vec, connection: &DatabaseConnection) -> Result<()> { let query = page::Entity::insert_many(pages).on_conflict( OnConflict::column(page::Column::Id) - .update_columns([page::Column::DownloadStatus, page::Column::Path]) + .update_columns([ + page::Column::DownloadStatus, + page::Column::Path, + page::Column::DanmakuLastSyncedAt, + page::Column::DanmakuSyncGeneration, + page::Column::DanmakuCidSnapshot, + ]) .to_owned(), ); query.exec(connection).await?; diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index af3bef3..9271d6e 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -527,6 +527,9 @@ pub async fn download_page( ); let results = [res_1.into(), res_2.into(), res_3.into(), res_4.into(), res_5.into()]; status.update_status(&results); + // 弹幕子任务在本轮"现下载成功"时(仅 Succeeded,不算 Skipped/Failed), + // 用于稍后补写弹幕同步元数据。提前到 results 移动之前求值。 + let danmaku_just_succeeded = matches!(results[3], ExecutionStatus::Succeeded); results .iter() .zip(["封面", "视频", "详情", "弹幕", "字幕"]) @@ -559,9 +562,27 @@ pub async fn download_page( bail!(e); } } + // 弹幕子任务在本轮"现下载成功"时(不是 Skipped),且策略已启用,才补写三个同步元数据字段。 + // + // - 不写在 Skipped 路径:避免把"未启用 no_danmaku 跳过"也算作一次成功同步。 + // - 不写在策略关闭路径:用户没有表达"持续追踪弹幕"的意图。如果这里无脑写入, + // 一旦用户后续启用策略,老视频会被 stage_for_age(allow_freeze=true) 直接打成 Frozen, + // 导致 refresh_danmaku_incremental 永远跳过它们,再也不会收到首次策略驱动的刷新。 let mut page_active_model: page::ActiveModel = page_model.into(); page_active_model.download_status = Set(status.into()); page_active_model.path = Set(Some(video_path.to_string_lossy().to_string())); + if danmaku_just_succeeded && cx.config.danmaku_update_policy.enabled { + let now = chrono::Utc::now(); + let stage = crate::utils::danmaku_schedule::stage_for_age( + &cx.config.danmaku_update_policy, + video_model.pubtime.and_utc(), + now, + /* allow_freeze */ true, + ); + page_active_model.danmaku_last_synced_at = Set(Some(now.naive_utc().to_string())); + page_active_model.danmaku_sync_generation = Set(stage.as_generation()); + page_active_model.danmaku_cid_snapshot = Set(Some(page_info.cid)); + } Ok(page_active_model) } diff --git a/crates/bili_sync/src/workflow_danmaku.rs b/crates/bili_sync/src/workflow_danmaku.rs new file mode 100644 index 0000000..94ad059 --- /dev/null +++ b/crates/bili_sync/src/workflow_danmaku.rs @@ -0,0 +1,570 @@ +//! 弹幕增量更新工作流。 +//! +//! 与 [`crate::workflow`] 中的"首次下载"流程解耦:这里只负责在视频已下载成功后, +//! 按照 [`crate::config::item::DanmakuUpdatePolicy`] 的策略周期性重新拉取弹幕。 +//! +//! 两种入口: +//! - [`refresh_danmaku_incremental`]:扫描所有已启用视频源里的 page,应用策略,逐个刷新。 +//! - [`refresh_danmaku_for_video`] / [`refresh_danmaku_for_page`]:手动触发,忽略策略。 +//! +//! 流程内会顺带做 **UP 换源检测**:调用一次 `get_view_info` 读最新的 cid/duration/width/height, +//! 与数据库缓存对比,发现变化时更新 page 表对应字段。cid 变化时同时清除 `download_status` 的 +//! 弹幕位,强制后续按新 cid 重建。 + +use std::path::{Path, PathBuf}; + +use anyhow::{Context, Result, anyhow, bail}; +use bili_sync_entity::*; +use chrono::{DateTime, TimeZone, Utc}; +use sea_orm::ActiveValue::Set; +use sea_orm::entity::prelude::*; + +use crate::bilibili::{BiliClient, Dimension, PageInfo as BiliPageInfo, Video, VideoInfo}; +use crate::config::Config; +use crate::utils::danmaku_schedule::{Decision, Stage, should_sync_danmaku}; + +/// 弹幕子任务在 download_status 中的位偏移(与 PageStatus 保持一致)。 +const DANMAKU_STATUS_OFFSET: usize = 3; + +/// 扫描所有视频源,按 [`DanmakuUpdatePolicy`] 刷新到期的 page 弹幕。 +/// +/// 策略未启用时直接返回。不会影响任何主下载流程。 +pub async fn refresh_danmaku_incremental( + bili_client: &BiliClient, + connection: &DatabaseConnection, + config: &Config, +) -> Result<()> { + if !config.danmaku_update_policy.enabled { + return Ok(()); + } + if config.skip_option.no_danmaku { + return Ok(()); + } + info!("开始执行本轮弹幕增量更新.."); + let candidates = load_candidate_videos(connection).await?; + let now = Utc::now(); + let mut processed = 0usize; + let mut refreshed = 0usize; + for (video_model, pages) in candidates { + let selected = pages + .into_iter() + .filter_map(|page| { + let pubtime = video_model.pubtime.and_utc(); + let last_synced = page + .danmaku_last_synced_at + .as_deref() + .and_then(parse_stored_datetime); + match should_sync_danmaku( + &config.danmaku_update_policy, + pubtime, + last_synced, + page.danmaku_sync_generation, + now, + ) { + Decision::Sync { next_stage } => Some((page, Some(next_stage))), + Decision::Skip => None, + } + }) + .collect::>(); + if selected.is_empty() { + continue; + } + match refresh_video_pages(bili_client, connection, config, &video_model, selected, now).await { + Ok(n) => { + refreshed += n; + processed += 1; + } + Err(e) => { + error!( + "刷新视频「{}」({}) 的弹幕失败:{:#}", + video_model.name, video_model.bvid, e + ); + } + } + } + info!( + "弹幕增量更新结束:处理视频 {} 个,刷新分页 {} 个", + processed, refreshed + ); + Ok(()) +} + +/// 手动触发:刷新某个视频所有 page 的弹幕(忽略策略,强制执行)。 +pub async fn refresh_danmaku_for_video( + video_id: i32, + bili_client: &BiliClient, + connection: &DatabaseConnection, + config: &Config, +) -> Result { + let video_model = video::Entity::find_by_id(video_id) + .one(connection) + .await? + .ok_or_else(|| anyhow!("video {} 不存在", video_id))?; + let pages = page::Entity::find() + .filter(page::Column::VideoId.eq(video_id)) + .all(connection) + .await?; + if pages.is_empty() { + return Ok(0); + } + let now = Utc::now(); + // 手动触发:next_stage 传 None,让 refresh_one_page 内部按 age 计算(且不会冻结)。 + // 这样不会把 Mature/Cold 视频回退成 Fresh,也不会把活跃视频意外冻结。 + let selected = pages.into_iter().map(|p| (p, None)).collect(); + refresh_video_pages(bili_client, connection, config, &video_model, selected, now).await +} + +/// 手动触发:刷新单个 page 的弹幕(忽略策略,强制执行)。 +/// +/// 与 [`refresh_danmaku_for_video`] 的 best-effort 模式不同,本接口走严格模式: +/// 只要存在任何错误(page 不存在、view_info 拉取失败、新 view_info 中 pid 不再出现、 +/// 弹幕写入失败等)都直接 bail,确保 API 调用方不会收到"假成功"。 +/// +/// 成功时返回刷新成功的 page 数(恒为 1)。 +pub async fn refresh_danmaku_for_page( + page_id: i32, + bili_client: &BiliClient, + connection: &DatabaseConnection, + config: &Config, +) -> Result { + let page_model = page::Entity::find_by_id(page_id) + .one(connection) + .await? + .ok_or_else(|| anyhow!("page {} 不存在", page_id))?; + let video_model = video::Entity::find_by_id(page_model.video_id) + .one(connection) + .await? + .ok_or_else(|| anyhow!("page {} 的宿主 video 不存在", page_id))?; + let now = Utc::now(); + let bili_video = Video::new(bili_client, video_model.bvid.as_str(), &config.credential); + let view_info = bili_video + .get_view_info() + .await + .with_context(|| format!("获取视频 {} 的 view_info 失败", video_model.bvid))?; + let VideoInfo::Detail { pages: fresh_pages, .. } = view_info else { + bail!("view_info 返回了非 Detail 类型,无法刷新弹幕"); + }; + let fresh = fresh_pages + .iter() + .find(|p| p.page == page_model.pid) + .ok_or_else(|| { + anyhow!( + "视频「{}」({}) 的分页 pid={} 在最新的 view_info 中已不存在", + video_model.name, + video_model.bvid, + page_model.pid + ) + })?; + refresh_one_page( + &bili_video, + connection, + config, + &video_model, + page_model, + fresh, + None, + now, + ) + .await?; + Ok(1) +} + +/// 候选视频:有效 + 有路径(至少下载过) + 至少存在一个 page 的 download_status 弹幕位已成功, +/// 且**所属源仍处于启用状态**。 +/// +/// 与项目里其他流程保持一致:disabled 源被视为"用户主动暂停处理",弹幕增量也不再触碰它的内容, +/// 避免后台默默地继续请求 B 站接口和改写本地 ASS 文件。 +async fn load_candidate_videos( + connection: &DatabaseConnection, +) -> Result)>> { + use sea_orm::{Condition, QuerySelect}; + + // 一次性取齐四类启用源的 id 集合 + let favorite_ids: Vec = favorite::Entity::find() + .filter(favorite::Column::Enabled.eq(true)) + .select_only() + .column(favorite::Column::Id) + .into_tuple() + .all(connection) + .await + .context("load enabled favorite ids failed")?; + let collection_ids: Vec = collection::Entity::find() + .filter(collection::Column::Enabled.eq(true)) + .select_only() + .column(collection::Column::Id) + .into_tuple() + .all(connection) + .await + .context("load enabled collection ids failed")?; + let submission_ids: Vec = submission::Entity::find() + .filter(submission::Column::Enabled.eq(true)) + .select_only() + .column(submission::Column::Id) + .into_tuple() + .all(connection) + .await + .context("load enabled submission ids failed")?; + let watch_later_ids: Vec = watch_later::Entity::find() + .filter(watch_later::Column::Enabled.eq(true)) + .select_only() + .column(watch_later::Column::Id) + .into_tuple() + .all(connection) + .await + .context("load enabled watch_later ids failed")?; + + // 至少一个外键命中启用集合,才纳入候选;全部为空时直接 early-return 避免无意义查询。 + if favorite_ids.is_empty() + && collection_ids.is_empty() + && submission_ids.is_empty() + && watch_later_ids.is_empty() + { + return Ok(Vec::new()); + } + let mut source_filter = Condition::any(); + if !favorite_ids.is_empty() { + source_filter = source_filter.add(video::Column::FavoriteId.is_in(favorite_ids)); + } + if !collection_ids.is_empty() { + source_filter = source_filter.add(video::Column::CollectionId.is_in(collection_ids)); + } + if !submission_ids.is_empty() { + source_filter = source_filter.add(video::Column::SubmissionId.is_in(submission_ids)); + } + if !watch_later_ids.is_empty() { + source_filter = source_filter.add(video::Column::WatchLaterId.is_in(watch_later_ids)); + } + + video::Entity::find() + .filter( + Condition::all() + .add(video::Column::Valid.eq(true)) + .add(video::Column::Path.ne("")) + .add(source_filter), + ) + .find_with_related(page::Entity) + .all(connection) + .await + .context("load candidate videos for danmaku refresh failed") + .map(|rows| { + rows.into_iter() + .map(|(v, pages)| { + // 只保留弹幕任务已经成功过的 page;从未成功过的交给主流程处理 + let filtered = pages + .into_iter() + .filter(|p| danmaku_subtask_completed(p.download_status)) + .collect::>(); + (v, filtered) + }) + .filter(|(_, pages)| !pages.is_empty()) + .collect() + }) +} + +/// 检查 download_status 中弹幕子任务是否为 STATUS_OK(值为 7)。 +fn danmaku_subtask_completed(status: u32) -> bool { + let slot = (status >> (DANMAKU_STATUS_OFFSET * 3)) & 0b111; + slot == crate::utils::status::STATUS_OK +} + +/// UP 主换源(cid 变化)后,将 page 的所有非弹幕子任务(封面/视频/NFO/字幕)位重置为 +/// `STATUS_NOT_STARTED`,让主下载流程下一轮重新拉取 MP4/SRT/封面等本地资产。 +/// +/// **保留弹幕位为 STATUS_OK**:因为本次刷新已经用新 cid 写入了正确的 ASS 文件。 +/// 同时清掉 STATUS_COMPLETED 高位,让 page 重新进入"未完成"状态。 +fn reset_non_danmaku_subtasks(status: u32) -> u32 { + let mut new_status = status; + for offset in 0..5 { + if offset == DANMAKU_STATUS_OFFSET { + continue; + } + new_status &= !(0b111 << (offset * 3)); + } + new_status & !(1 << 31) // 清完成标记 +} + +/// 当某个 page 的 cid 变了之后,需要让其所属 video 重新进入 `filter_unhandled_video_pages` +/// 的候选集。两个条件: +/// 1. 清掉 STATUS_COMPLETED 高位(否则 `lt(STATUS_COMPLETED)` 过滤会把它直接排除)。 +/// 2. 把视频层"分页下载"子任务(offset 4)位归零,让 `should_run` 重新返回 true。 +fn reset_video_for_page_redownload(status: u32) -> u32 { + const PAGE_DOWNLOAD_OFFSET: usize = 4; + let cleared = status & !(0b111 << (PAGE_DOWNLOAD_OFFSET * 3)); + cleared & !(1 << 31) +} + +/// 对某个视频下选中的 page 做一次弹幕刷新:拉 view_info 检测换源 → 逐个重抓弹幕 → 更新元数据。 +/// +/// 返回本次成功刷新的 page 数量。 +async fn refresh_video_pages( + bili_client: &BiliClient, + connection: &DatabaseConnection, + config: &Config, + video_model: &video::Model, + selected: Vec<(page::Model, Option)>, + now: DateTime, +) -> Result { + let bili_video = Video::new(bili_client, video_model.bvid.as_str(), &config.credential); + // 拉一次 view_info,拿到最新的 cid/duration/dimension;失败则本轮跳过该视频 + let view_info = bili_video + .get_view_info() + .await + .with_context(|| format!("刷新视频 {} 时获取 view_info 失败", video_model.bvid))?; + let VideoInfo::Detail { pages: fresh_pages, .. } = view_info else { + bail!("view_info 返回了非 Detail 类型,无法刷新弹幕"); + }; + let mut success = 0usize; + for (db_page, next_stage) in selected { + let fresh = fresh_pages.iter().find(|p| p.page == db_page.pid); + let Some(fresh) = fresh else { + warn!( + "视频「{}」({}) 的分页 pid={} 在新拉取的 view_info 中不存在,跳过", + video_model.name, video_model.bvid, db_page.pid + ); + continue; + }; + if let Err(e) = refresh_one_page( + &bili_video, + connection, + config, + video_model, + db_page, + fresh, + next_stage, + now, + ) + .await + { + error!( + "刷新视频「{}」({}) 分页 pid={} 弹幕失败:{:#}", + video_model.name, video_model.bvid, fresh.page, e + ); + continue; + } + success += 1; + } + Ok(success) +} + +/// `next_stage` 语义: +/// - `Some(stage)`:调度路径,使用决策函数算好的阶段(可能是 `Frozen`)。 +/// - `None`:手动触发路径,按 page 当前年龄计算阶段,不允许冻结(cap 在 `Cold`), +/// 避免用户手动刷新已成熟视频时被回退成 `Fresh`,也避免活跃视频被意外冻结。 +async fn refresh_one_page( + bili_video: &Video<'_>, + connection: &DatabaseConnection, + config: &Config, + video_model: &video::Model, + db_page: page::Model, + fresh: &BiliPageInfo, + next_stage: Option, + now: DateTime, +) -> Result<()> { + let pubtime = video_model.pubtime.and_utc(); + let resolved_stage = next_stage.unwrap_or_else(|| { + crate::utils::danmaku_schedule::stage_for_age( + &config.danmaku_update_policy, + pubtime, + now, + /* allow_freeze */ false, + ) + }); + let danmaku_path = resolve_danmaku_path(video_model, &db_page)?; + let (fresh_width, fresh_height) = extract_dimension(fresh.dimension.as_ref()); + let cid_changed = db_page.cid != fresh.cid; + let duration_changed = db_page.duration != fresh.duration; + let dimension_changed = fresh_width != db_page.width || fresh_height != db_page.height; + + if cid_changed { + warn!( + "检测到视频「{}」({}) 分页 pid={} 的 cid 发生变化 ({} -> {}),可能是 UP 主换源,已重置弹幕状态", + video_model.name, video_model.bvid, fresh.page, db_page.cid, fresh.cid + ); + } + + // 使用最新的 PageInfo 构造弹幕请求:保证换源后的新 duration 被用于分段数 + let page_info_for_danmaku = BiliPageInfo { + cid: fresh.cid, + page: fresh.page, + name: db_page.name.clone(), + duration: fresh.duration, + first_frame: fresh.first_frame.clone(), + dimension: fresh.dimension.as_ref().map(|d| Dimension { + width: d.width, + height: d.height, + rotate: d.rotate, + }), + }; + + // 原子写入:先写到 .tmp,再 rename,避免播放器读到半截 ASS + let tmp_path = make_tmp_path(&danmaku_path); + bili_video + .get_danmaku_writer(&page_info_for_danmaku) + .await? + .write(tmp_path.clone(), &config.danmaku_option) + .await?; + tokio::fs::rename(&tmp_path, &danmaku_path) + .await + .with_context(|| format!("重命名弹幕文件 {:?} -> {:?} 失败", tmp_path, danmaku_path))?; + + // 写回数据库 + let now_str = now.naive_utc().to_string(); + let mut active: page::ActiveModel = db_page.clone().into(); + active.danmaku_last_synced_at = Set(Some(now_str)); + active.danmaku_sync_generation = Set(resolved_stage.as_generation()); + active.danmaku_cid_snapshot = Set(Some(fresh.cid)); + if cid_changed { + // cid 变化 = UP 主把这页换成了不同内容(不是简单修正)。本地的 MP4/SRT/封面/NFO 都还指向 + // 旧 cid 的内容,必须让主下载流程重抓一次。这里: + // 1. 清掉 page 的非弹幕子任务位(弹幕已经用新 cid 写盘,保留 OK,避免下一轮 incremental 又跑一次)。 + // 2. 同时清掉所属 video 的"分页下载"子任务 + STATUS_COMPLETED 高位,让 video 重新被 + // filter_unhandled_video_pages 选中。否则 page 标记是"未完成"也没用,video 高位拦着。 + active.cid = Set(fresh.cid); + active.download_status = Set(reset_non_danmaku_subtasks(db_page.download_status)); + let new_video_status = reset_video_for_page_redownload(video_model.download_status); + if new_video_status != video_model.download_status { + let mut video_active: video::ActiveModel = video_model.clone().into(); + video_active.download_status = Set(new_video_status); + video_active + .update(connection) + .await + .context("cid 变化后重置 video.download_status 失败")?; + } + } + if duration_changed { + active.duration = Set(fresh.duration); + } + if dimension_changed { + active.width = Set(fresh_width); + active.height = Set(fresh_height); + } + active.update(connection).await.context("更新 page 弹幕同步状态失败")?; + info!( + "视频「{}」({}) 分页 pid={} 弹幕已刷新 -> stage={:?}", + video_model.name, video_model.bvid, fresh.page, resolved_stage + ); + Ok(()) +} + +/// 依据 [`Dimension::rotate`] 得到数据库里应保存的 (width, height)。 +fn extract_dimension(d: Option<&Dimension>) -> (Option, Option) { + match d { + Some(d) if d.rotate == 0 => (Some(d.width), Some(d.height)), + Some(d) => (Some(d.height), Some(d.width)), + None => (None, None), + } +} + +/// 根据 page_model.path 推断出弹幕 ASS 文件应写入的路径。 +/// +/// 与 [`crate::workflow::download_page`] 的拼接规则保持一致: +/// - 单页视频: `{base_path}/{base_name}.zh-CN.default.ass` +/// - 多页视频: `{base_path}/Season 1/{base_name} - S01E{pid}.zh-CN.default.ass` +fn resolve_danmaku_path(video_model: &video::Model, page_model: &page::Model) -> Result { + let is_single_page = video_model.single_page.context("single_page is null")?; + let old_video_path = page_model + .path + .as_deref() + .filter(|s| !s.is_empty()) + .ok_or_else(|| anyhow!("page 未记录下载路径,无法推断弹幕位置"))?; + let old_video_path = Path::new(old_video_path); + let old_video_filename = old_video_path + .file_name() + .context("invalid page path format")? + .to_string_lossy(); + if is_single_page { + let base_path = old_video_path.parent().context("invalid page path format")?; + let base_name = old_video_filename.trim_end_matches(".mp4"); + Ok(base_path.join(format!("{}.zh-CN.default.ass", base_name))) + } else { + let base_path = old_video_path + .parent() + .and_then(|p| p.parent()) + .context("invalid page path format")?; + let base_name = old_video_filename + .rsplit_once(" - ") + .context("invalid page path format")? + .0; + Ok(base_path + .join("Season 1") + .join(format!( + "{} - S01E{:0>2}.zh-CN.default.ass", + base_name, page_model.pid + ))) + } +} + +fn make_tmp_path(target: &Path) -> PathBuf { + let mut s = target.as_os_str().to_os_string(); + s.push(".tmp"); + PathBuf::from(s) +} + +/// 解析数据库中 `danmaku_last_synced_at` 字符串(NaiveDateTime::to_string 的格式,例如 "2026-04-13 10:20:30")。 +fn parse_stored_datetime(s: &str) -> Option> { + // NaiveDateTime::to_string() 产出 "YYYY-MM-DD HH:MM:SS[.fraction]" + chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%.f") + .or_else(|_| chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S")) + .ok() + .map(|naive| Utc.from_utc_datetime(&naive)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::utils::status::STATUS_OK; + + #[test] + fn danmaku_completed_detects_ok() { + let with_danmaku_ok: u32 = STATUS_OK << 9; + assert!(danmaku_subtask_completed(with_danmaku_ok)); + let without: u32 = STATUS_OK << 6; // 视频信息位,不是弹幕 + assert!(!danmaku_subtask_completed(without)); + } + + #[test] + fn reset_non_danmaku_subtasks_keeps_only_danmaku_ok() { + // 五个子任务都 OK + 完成位 + let all_ok_completed: u32 = (1u32 << 31) + | (0..5) + .map(|i| STATUS_OK << (i * 3)) + .fold(0u32, |a, b| a | b); + let reset = reset_non_danmaku_subtasks(all_ok_completed); + // 弹幕位保留 + assert_eq!((reset >> 9) & 0b111, STATUS_OK); + // 其它四个位都被清零 + for i in [0usize, 1, 2, 4] { + assert_eq!((reset >> (i * 3)) & 0b111, 0); + } + // 完成位被清掉 + assert_eq!(reset >> 31, 0); + } + + #[test] + fn reset_video_for_page_redownload_clears_subtask_4_and_completed_bit() { + // 五个子任务都 OK + 完成位 + let video_done: u32 = (1u32 << 31) + | (0..5) + .map(|i| STATUS_OK << (i * 3)) + .fold(0u32, |a, b| a | b); + let reset = reset_video_for_page_redownload(video_done); + // offset 4(分页下载子任务)被清零 + assert_eq!((reset >> 12) & 0b111, 0); + // 其它子任务保留 + for i in [0usize, 1, 2, 3] { + assert_eq!((reset >> (i * 3)) & 0b111, STATUS_OK); + } + // 完成位被清掉 + assert_eq!(reset >> 31, 0); + } + + #[test] + fn parse_stored_datetime_roundtrip() { + let now = chrono::Utc + .with_ymd_and_hms(2026, 4, 13, 10, 20, 30) + .unwrap(); + let s = now.naive_utc().to_string(); + let parsed = parse_stored_datetime(&s).expect("parse ok"); + assert_eq!(parsed, now); + } +} diff --git a/crates/bili_sync_entity/src/entities/page.rs b/crates/bili_sync_entity/src/entities/page.rs index b794790..e351194 100644 --- a/crates/bili_sync_entity/src/entities/page.rs +++ b/crates/bili_sync_entity/src/entities/page.rs @@ -18,6 +18,9 @@ pub struct Model { pub image: Option, pub download_status: u32, pub created_at: String, + pub danmaku_last_synced_at: Option, + pub danmaku_sync_generation: u32, + pub danmaku_cid_snapshot: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/crates/bili_sync_migration/src/lib.rs b/crates/bili_sync_migration/src/lib.rs index 44c3a9c..b656a73 100644 --- a/crates/bili_sync_migration/src/lib.rs +++ b/crates/bili_sync_migration/src/lib.rs @@ -11,6 +11,7 @@ mod m20250712_080013_add_video_created_at_index; mod m20250903_094454_add_rule_and_should_download; mod m20251009_123713_add_use_dynamic_api; mod m20260324_055217_add_staff; +mod m20260413_000001_add_danmaku_sync_fields; pub struct Migrator; @@ -29,6 +30,7 @@ impl MigratorTrait for Migrator { Box::new(m20250903_094454_add_rule_and_should_download::Migration), Box::new(m20251009_123713_add_use_dynamic_api::Migration), Box::new(m20260324_055217_add_staff::Migration), + Box::new(m20260413_000001_add_danmaku_sync_fields::Migration), ] } } diff --git a/crates/bili_sync_migration/src/m20260413_000001_add_danmaku_sync_fields.rs b/crates/bili_sync_migration/src/m20260413_000001_add_danmaku_sync_fields.rs new file mode 100644 index 0000000..4a92f86 --- /dev/null +++ b/crates/bili_sync_migration/src/m20260413_000001_add_danmaku_sync_fields.rs @@ -0,0 +1,81 @@ +use sea_orm_migration::prelude::*; +use sea_orm_migration::schema::*; + +/// 为 page 表新增"弹幕增量更新"所需字段。 +/// +/// - `danmaku_last_synced_at`: 上次弹幕成功同步的时间戳(含首次下载成功),为空表示从未同步过。 +/// - `danmaku_sync_generation`: 弹幕同步阶段标记。0=未开始,1=新鲜期,2=成熟期,3=老化期,4=已冻结。 +/// - `danmaku_cid_snapshot`: 上次成功同步时使用的 cid,用于 UP 主换源检测。 +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // SQLite 不支持单条 ALTER TABLE 同时修改多列,必须拆分为独立语句。 + manager + .alter_table( + Table::alter() + .table(Page::Table) + .add_column(timestamp_null(Page::DanmakuLastSyncedAt)) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(Page::Table) + .add_column( + ColumnDef::new(Page::DanmakuSyncGeneration) + .unsigned() + .not_null() + .default(0u32), + ) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(Page::Table) + .add_column(big_integer_null(Page::DanmakuCidSnapshot)) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .alter_table( + Table::alter() + .table(Page::Table) + .drop_column(Page::DanmakuLastSyncedAt) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(Page::Table) + .drop_column(Page::DanmakuSyncGeneration) + .to_owned(), + ) + .await?; + manager + .alter_table( + Table::alter() + .table(Page::Table) + .drop_column(Page::DanmakuCidSnapshot) + .to_owned(), + ) + .await + } +} + +#[derive(DeriveIden)] +enum Page { + Table, + DanmakuLastSyncedAt, + DanmakuSyncGeneration, + DanmakuCidSnapshot, +} diff --git a/web/package-lock.json b/web/package-lock.json index f9a496e..1e54129 100644 --- a/web/package-lock.json +++ b/web/package-lock.json @@ -1,12 +1,12 @@ { "name": "bili-sync-web", - "version": "2.9.4", + "version": "2.11.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "bili-sync-web", - "version": "2.9.4", + "version": "2.11.0", "dependencies": { "@types/qrcode": "^1.5.6", "qrcode": "^1.5.4" @@ -33,6 +33,7 @@ "layerchart": "^2.0.0-next.43", "mode-watcher": "^1.1.0", "prettier": "^3.7.4", + "prettier-plugin-organize-imports": "^4.3.0", "prettier-plugin-svelte": "^3.4.1", "prettier-plugin-tailwindcss": "^0.7.2", "svelte": "^5.46.1", @@ -773,7 +774,6 @@ "integrity": "sha512-oJrXtQiAXLvT9clCf1K4kxp3eKsQhIaZqxEyowkBcsvZDdZkbWrVmnGknxs5flTD0VGsxrxKgBCZty1EzoiMzA==", "dev": true, "license": "Apache-2.0", - "peer": true, "dependencies": { "@swc/helpers": "^0.5.0" } @@ -1277,7 +1277,6 @@ "integrity": "sha512-JFtOqDoU0DI/+QSG8qnq5bKcehVb3tCHhOG4amsSYth5/KgO4EkJvi42xSAiyKmXAAULW1/Zdb6lkgGEgSxdZg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@standard-schema/spec": "^1.0.0", "@sveltejs/acorn-typescript": "^1.0.5", @@ -1321,7 +1320,6 @@ "integrity": "sha512-ou/d51QSdTyN26D7h6dSpusAKaZkAiGM55/AKYi+9AGZw7q85hElbjK3kEyzXHhLSnRISHOYzVge6x0jRZ7DXA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@sveltejs/vite-plugin-svelte-inspector": "^5.0.0", "deepmerge": "^4.3.1", @@ -1781,7 +1779,6 @@ "integrity": "sha512-iIACsx8pxRnguSYhHiMn2PvhvfpopO9FXHyn1mG5txZIsAaB6F0KwbFnUQN3KCiG3Jcuad/Cao2FAs1Wp7vAyg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@typescript-eslint/scope-manager": "8.52.0", "@typescript-eslint/types": "8.52.0", @@ -1999,7 +1996,6 @@ "integrity": "sha512-NZyJarBfL7nWwIq+FDL6Zp/yHEhePMNnnJ0y3qfieCrmNvYct8uvtiV41UvlSe6apAfk0fY1FbWx+NwfmpvtTg==", "dev": true, "license": "MIT", - "peer": true, "bin": { "acorn": "bin/acorn" }, @@ -2774,7 +2770,6 @@ "integrity": "sha512-LEyamqS7W5HB3ujJyvi0HQK/dtVINZvd5mAAp9eT5S/ujByGjiZLCzPcHVzuXbpJDJF/cxwHlfceVUDZ2lnSTw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@eslint-community/eslint-utils": "^4.8.0", "@eslint-community/regexpp": "^4.12.1", @@ -4019,7 +4014,6 @@ "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -4056,7 +4050,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "nanoid": "^3.3.11", "picocolors": "^1.1.1", @@ -4190,7 +4183,6 @@ "integrity": "sha512-v6UNi1+3hSlVvv8fSaoUbggEM5VErKmmpGA7Pl3HF8V6uKY7rvClBOJlH6yNwQtfTueNkGVpOv/mtWL9L4bgRA==", "dev": true, "license": "MIT", - "peer": true, "bin": { "prettier": "bin/prettier.cjs" }, @@ -4201,13 +4193,29 @@ "url": "https://github.com/prettier/prettier?sponsor=1" } }, + "node_modules/prettier-plugin-organize-imports": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/prettier-plugin-organize-imports/-/prettier-plugin-organize-imports-4.3.0.tgz", + "integrity": "sha512-FxFz0qFhyBsGdIsb697f/EkvHzi5SZOhWAjxcx2dLt+Q532bAlhswcXGYB1yzjZ69kW8UoadFBw7TyNwlq96Iw==", + "dev": true, + "license": "MIT", + "peerDependencies": { + "prettier": ">=2.0", + "typescript": ">=2.9", + "vue-tsc": "^2.1.0 || 3" + }, + "peerDependenciesMeta": { + "vue-tsc": { + "optional": true + } + } + }, "node_modules/prettier-plugin-svelte": { "version": "3.4.1", "resolved": "https://registry.npmjs.org/prettier-plugin-svelte/-/prettier-plugin-svelte-3.4.1.tgz", "integrity": "sha512-xL49LCloMoZRvSwa6IEdN2GV6cq2IqpYGstYtMT+5wmml1/dClEoI0MZR78MiVPpu6BdQFfN0/y73yO6+br5Pg==", "dev": true, "license": "MIT", - "peer": true, "peerDependencies": { "prettier": "^3.0.0", "svelte": "^3.2.0 || ^4.0.0-next.0 || ^5.0.0-next.0" @@ -4604,7 +4612,6 @@ "integrity": "sha512-ynjfCHD3nP2el70kN5Pmg37sSi0EjOm9FgHYQdC4giWG/hzO3AatzXXJJgP305uIhGQxSufJLuYWtkY8uK/8RA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@jridgewell/remapping": "^2.3.4", "@jridgewell/sourcemap-codec": "^1.5.0", @@ -4796,8 +4803,7 @@ "resolved": "https://registry.npmjs.org/tailwindcss/-/tailwindcss-4.1.18.tgz", "integrity": "sha512-4+Z+0yiYyEtUVCScyfHCxOYP06L5Ne+JiHhY2IjR2KWMIWhJOYZKLSGZaP5HkZ8+bY0cxfzwDE5uOmzFXyIwxw==", "dev": true, - "license": "MIT", - "peer": true + "license": "MIT" }, "node_modules/tapable": { "version": "2.3.0", @@ -4889,7 +4895,6 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -4951,7 +4956,6 @@ "integrity": "sha512-w+N7Hifpc3gRjZ63vYBXA56dvvRlNWRczTdmCBBa+CotUzAPf5b7YMdMR/8CQoeYE5LX3W4wj6RYTgonm1b9DA==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.27.0", "fdir": "^6.5.0", diff --git a/web/src/lib/api.ts b/web/src/lib/api.ts index 7e02098..b6d622a 100644 --- a/web/src/lib/api.ts +++ b/web/src/lib/api.ts @@ -14,6 +14,7 @@ import type { InsertSubmissionRequest, Notifier, QrcodePollResponse as PollQrcodeResponse, + RefreshDanmakuResponse, ResetFilteredVideosResponse, ResetFilteredVideoStatusRequest, ResetVideoResponse, @@ -287,6 +288,14 @@ class ApiClient { return this.post('/task/download'); } + async refreshDanmakuForVideo(id: number): Promise> { + return this.post(`/videos/${id}/refresh-danmaku`); + } + + async refreshDanmakuForPage(id: number): Promise> { + return this.post(`/pages/${id}/refresh-danmaku`); + } + async generateQrcode(): Promise> { return this.post('/login/qrcode/generate'); } @@ -345,6 +354,8 @@ const api = { updateConfig: (config: Config) => apiClient.updateConfig(config), getDashboard: () => apiClient.getDashboard(), triggerDownloadTask: () => apiClient.triggerDownloadTask(), + refreshDanmakuForVideo: (id: number) => apiClient.refreshDanmakuForVideo(id), + refreshDanmakuForPage: (id: number) => apiClient.refreshDanmakuForPage(id), generateQrcode: () => apiClient.generateQrcode(), pollQrcode: (qrcodeKey: string) => apiClient.pollQrcode(qrcodeKey), subscribeToSysInfo: (onMessage: (data: SysInfo) => void) => diff --git a/web/src/lib/consts.ts b/web/src/lib/consts.ts index c539b56..36c824f 100644 --- a/web/src/lib/consts.ts +++ b/web/src/lib/consts.ts @@ -6,3 +6,38 @@ export const VIDEO_SOURCES = { SUBMISSION: { type: 'submission', title: '用户投稿', icon: UserIcon }, WATCH_LATER: { type: 'watch_later', title: '稍后再看', icon: ClockIcon } }; + +/** + * 弹幕同步阶段标签映射,对应 Rust 端 `danmaku_sync_generation` 字段。 + * 0=未开始;首次同步前不展示 badge。 + */ +export const DANMAKU_GENERATION_LABELS: Record< + number, + { text: string; variant: 'default' | 'secondary' | 'outline' | 'destructive' } +> = { + 0: { text: '待更新', variant: 'outline' }, + 1: { text: '新鲜期', variant: 'default' }, + 2: { text: '成熟期', variant: 'secondary' }, + 3: { text: '老化期', variant: 'outline' }, + 4: { text: '已冻结', variant: 'outline' } +}; + +/** 将任意可解析为日期的字符串格式化为相对时间("2 小时前")。 */ +export function formatRelativeTime(input: string | Date | null | undefined): string { + if (!input) return '从未同步'; + const then = typeof input === 'string' ? new Date(input.replace(' ', 'T') + 'Z') : input; + const diff = Date.now() - then.getTime(); + if (Number.isNaN(diff)) return '从未同步'; + if (diff < 0) return '刚刚'; + const minutes = Math.floor(diff / 60_000); + if (minutes < 1) return '刚刚'; + if (minutes < 60) return `${minutes} 分钟前`; + const hours = Math.floor(minutes / 60); + if (hours < 24) return `${hours} 小时前`; + const days = Math.floor(hours / 24); + if (days < 30) return `${days} 天前`; + const months = Math.floor(days / 30); + if (months < 12) return `${months} 个月前`; + const years = Math.floor(days / 365); + return `${years} 年前`; +} diff --git a/web/src/lib/types.ts b/web/src/lib/types.ts index ef86087..02f6437 100644 --- a/web/src/lib/types.ts +++ b/web/src/lib/types.ts @@ -48,9 +48,18 @@ export interface VideosResponse { export interface PageInfo { id: number; + video_id: number; pid: number; name: string; download_status: [number, number, number, number, number]; + danmaku_last_synced_at: string | null; + /** 弹幕同步阶段:0=未开始 1=新鲜期 2=成熟期 3=老化期 4=冷冻 */ + danmaku_sync_generation: number; + danmaku_cid_snapshot: number | null; +} + +export interface RefreshDanmakuResponse { + refreshed: number; } export interface VideoResponse { @@ -276,6 +285,23 @@ export interface DanmakuOption { time_offset: number; } +/** + * 弹幕增量更新策略(三段式)。 + * - 新鲜期:发布 fresh_days 天内每 fresh_interval_hours 小时更新一次; + * - 成熟期:到 mature_days 之间每 mature_interval_days 天更新一次; + * - 老化期:到 cold_days 之间每 cold_interval_days 天更新一次; + * - 冷冻:超过 cold_days 后最后刷新一次,之后不再自动更新(手动触发仍可)。 + */ +export interface DanmakuUpdatePolicy { + enabled: boolean; + fresh_days: number; + fresh_interval_hours: number; + mature_days: number; + mature_interval_days: number; + cold_days: number; + cold_interval_days: number; +} + export interface SkipOption { no_poster: boolean; no_video_nfo: boolean; @@ -340,6 +366,7 @@ export interface Config { time_format: string; cdn_sorting: boolean; try_upower_anyway: boolean; + danmaku_update_policy: DanmakuUpdatePolicy; version: number; } diff --git a/web/src/routes/settings/+page.svelte b/web/src/routes/settings/+page.svelte index 9716a1c..5497792 100644 --- a/web/src/routes/settings/+page.svelte +++ b/web/src/routes/settings/+page.svelte @@ -787,6 +787,103 @@ + + + + +
+
+
+

弹幕增量更新

+

+ 视频下载完成后按策略周期性重抓弹幕。依据发布时长分为三段:新鲜期高频、成熟期中频、老化期低频;超过冷冻阈值后触发最后一次刷新并冻结,不再自动更新(可手动触发)。 +

+
+
+ + +
+
+ + {#if formData.danmaku_update_policy.enabled} +
+
+
+ + +

发布后这段时间内认为弹幕快速增长

+
+
+ + +
+
+
+
+ + +

必须 ≥ 新鲜期天数

+
+
+ + +
+
+
+
+ + +

+ 超过该阈值后触发最后一次刷新并冻结,必须 ≥ 成熟期天数 +

+
+
+ + +
+
+
+ {/if} +
diff --git a/web/src/routes/video/[id]/+page.svelte b/web/src/routes/video/[id]/+page.svelte index 364b102..1cbfed3 100644 --- a/web/src/routes/video/[id]/+page.svelte +++ b/web/src/routes/video/[id]/+page.svelte @@ -6,11 +6,18 @@ import api from '$lib/api'; import SquareArrowOutUpRightIcon from '@lucide/svelte/icons/square-arrow-out-up-right'; import type { ApiError, VideoResponse, UpdateVideoStatusRequest } from '$lib/types'; - import { RotateCcwIcon, SquarePenIcon, BrushCleaningIcon } from '@lucide/svelte/icons'; + import { + RotateCcwIcon, + SquarePenIcon, + BrushCleaningIcon, + RefreshCwIcon + } from '@lucide/svelte/icons'; import { setBreadcrumb } from '$lib/stores/breadcrumb'; import { appStateStore, ToQuery } from '$lib/stores/filter'; import VideoCard from '$lib/components/video-card.svelte'; import StatusEditor from '$lib/components/status-editor.svelte'; + import { Badge } from '$lib/components/ui/badge/index.js'; + import { DANMAKU_GENERATION_LABELS, formatRelativeTime } from '$lib/consts'; import { toast } from 'svelte-sonner'; let videoData: VideoResponse | null = null; @@ -22,6 +29,8 @@ let clearAndResetting = false; let statusEditorOpen = false; let statusEditorLoading = false; + let refreshingDanmaku = false; + let refreshingPageDanmaku = new Set(); async function loadVideoDetail() { const videoId = parseInt($page.params.id!); @@ -113,6 +122,44 @@ } } + async function handleRefreshDanmaku() { + if (!videoData || refreshingDanmaku) return; + refreshingDanmaku = true; + try { + const result = await api.refreshDanmakuForVideo(videoData.video.id); + toast.success('弹幕刷新完成', { + description: `已成功刷新 ${result.data.refreshed} 个分页` + }); + await loadVideoDetail(); + } catch (error) { + console.error('弹幕刷新失败:', error); + toast.error('弹幕刷新失败', { + description: (error as ApiError).message + }); + } finally { + refreshingDanmaku = false; + } + } + + async function handleRefreshPageDanmaku(pageId: number) { + if (refreshingPageDanmaku.has(pageId)) return; + refreshingPageDanmaku = new Set([...refreshingPageDanmaku, pageId]); + try { + await api.refreshDanmakuForPage(pageId); + toast.success('弹幕刷新完成'); + await loadVideoDetail(); + } catch (error) { + console.error('弹幕刷新失败:', error); + toast.error('弹幕刷新失败', { + description: (error as ApiError).message + }); + } finally { + const next = new Set(refreshingPageDanmaku); + next.delete(pageId); + refreshingPageDanmaku = next; + } + } + async function handleClearAndReset() { if (!videoData) return; try { @@ -196,6 +243,17 @@ 清空重置 + + + {/each}