From bf762b1b0ac950c24191a01d1cf8f8cbadf768ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=B4=80=E1=B4=8D=E1=B4=9B=E1=B4=8F=E1=B4=80=E1=B4=87?= =?UTF-8?q?=CA=80?= Date: Wed, 24 Jun 2026 01:22:38 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E6=89=B9=E9=87=8F=E5=A1=AB=E5=85=85?= =?UTF-8?q?=E8=A7=86=E9=A2=91=E8=AF=A6=E6=83=85=EF=BC=8C=E9=81=BF=E5=85=8D?= =?UTF-8?q?=E8=BF=87=E5=A4=9A=E4=BA=8B=E5=8A=A1=20(#725)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/bili_sync/src/utils/model.rs | 101 ++++++++++++++++++++++++++-- crates/bili_sync/src/workflow.rs | 50 +++++++++----- 2 files changed, 132 insertions(+), 19 deletions(-) diff --git a/crates/bili_sync/src/utils/model.rs b/crates/bili_sync/src/utils/model.rs index 43804cc..7d009e7 100644 --- a/crates/bili_sync/src/utils/model.rs +++ b/crates/bili_sync/src/utils/model.rs @@ -1,10 +1,11 @@ use anyhow::{Context, Result, anyhow}; use bili_sync_entity::*; +use itertools::Itertools; use rand::seq::SliceRandom; use sea_orm::ActiveValue::Set; -use sea_orm::DatabaseTransaction; use sea_orm::entity::prelude::*; -use sea_orm::sea_query::{OnConflict, SimpleExpr}; +use sea_orm::sea_query::{Expr, OnConflict, SimpleExpr}; +use sea_orm::{ConnectionTrait, DatabaseTransaction, IdenStatic, Statement}; use crate::adapter::{VideoSource, VideoSourceEnum}; use crate::bilibili::VideoInfo; @@ -76,8 +77,16 @@ pub async fn create_videos( /// 尝试创建 Page Model,如果发生冲突则忽略 pub async fn create_pages(pages_model: Vec, connection: &DatabaseTransaction) -> Result<()> { - for page_chunk in pages_model.chunks(200) { - page::Entity::insert_many(page_chunk.to_vec()) + let mut pages = pages_model.into_iter(); + loop { + // 这里 insert_many 要求 IntoIterator,vec 上调用 chunks 返回的类型不匹配,需要 to_vec 做 clone + // itertools 的 into_iter().chunks() 由于 !Send 也无法直接使用 + // 暂时手写 take + collect 作为避免 clone 的折中方案 + let page_chunk = pages.by_ref().take(200).collect::>(); + if page_chunk.is_empty() { + break; + } + page::Entity::insert_many(page_chunk) .on_conflict( OnConflict::columns([page::Column::VideoId, page::Column::Pid]) .do_nothing() @@ -90,6 +99,90 @@ pub async fn create_pages(pages_model: Vec, connection: &Data Ok(()) } +/// 更新视频 model 的详情字段 +pub async fn update_video_detail_models( + videos: Vec, + connection: &DatabaseTransaction, +) -> Result<()> { + if videos.is_empty() { + return Ok(()); + } + let columns = [ + video::Column::Id, + video::Column::CollectionId, + video::Column::FavoriteId, + video::Column::WatchLaterId, + video::Column::SubmissionId, + video::Column::UpperId, + video::Column::UpperName, + video::Column::UpperFace, + video::Column::Staff, + video::Column::Name, + video::Column::Bvid, + video::Column::Intro, + video::Column::Cover, + video::Column::Ctime, + video::Column::Pubtime, + video::Column::Favtime, + video::Column::DownloadStatus, + video::Column::Valid, + video::Column::ShouldDownload, + video::Column::Tags, + video::Column::SinglePage, + ]; + let row = format!("({})", std::iter::repeat_n("?", columns.len()).join(", ")); + let rows = std::iter::repeat_n(row.as_str(), videos.len()).join(", "); + let mut values = Vec::with_capacity(videos.len() * columns.len()); + for video in videos { + for column in columns { + values.push( + video + .get(column) + .into_value() + .ok_or_else(|| anyhow!("video column {} is not set", column.as_str()))?, + ); + } + } + let sql = format!( + "WITH tempdata({}) AS (VALUES {}) \ + UPDATE video \ + SET {} \ + FROM tempdata \ + WHERE video.id = tempdata.id", + columns.iter().map(IdenStatic::as_str).join(", "), + rows, + columns + .iter() + .skip(1) + .map(|column| { + let column = column.as_str(); + format!("{} = tempdata.{}", column, column) + }) + .join(", ") + ); + connection + .execute(Statement::from_sql_and_values( + connection.get_database_backend(), + sql, + values, + )) + .await?; + Ok(()) +} + +/// 将视频标记为失效 +pub async fn set_video_models_invalid(video_ids: Vec, connection: &DatabaseTransaction) -> Result<()> { + if video_ids.is_empty() { + return Ok(()); + } + video::Entity::update_many() + .filter(video::Column::Id.is_in(video_ids)) + .col_expr(video::Column::Valid, Expr::value(false)) + .exec(connection) + .await?; + Ok(()) +} + /// 更新视频 model 的下载状态 pub async fn update_videos_model(videos: Vec, connection: &DatabaseConnection) -> Result<()> { video::Entity::insert_many(videos) diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index 14f502e..7db083d 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -21,14 +21,20 @@ use crate::notifier::DownloadNotifyInfo; use crate::utils::download_context::DownloadContext; use crate::utils::format_arg::{page_format_args, video_format_args}; use crate::utils::model::{ - create_pages, create_videos, filter_unfilled_videos, filter_unhandled_video_pages, update_pages_model, - update_videos_model, + create_pages, create_videos, filter_unfilled_videos, filter_unhandled_video_pages, set_video_models_invalid, + update_pages_model, update_video_detail_models, update_videos_model, }; use crate::utils::nfo::{NFO, ToNFO}; use crate::utils::notify::notify; use crate::utils::rule::FieldEvaluatable; use crate::utils::status::{PageStatus, STATUS_OK, VideoStatus}; +#[allow(clippy::large_enum_variant)] +enum VideoDetailUpdate { + Invalid(i32), + Detail(Vec, video::ActiveModel), +} + /// 完整地处理某个视频来源 pub async fn process_video_source( video_source: VideoSourceEnum, @@ -98,7 +104,7 @@ pub async fn refresh_video_source<'a>( } }) .filter_map(|(idx, res)| futures::future::ready(video_source.should_filter(idx, res, &latest_row_at))) - .chunks(10); + .chunks(30); let mut count = 0; while let Some(videos_info) = video_streams.next().await { count += videos_info.len(); @@ -125,7 +131,7 @@ pub async fn fetch_video_details( ) -> Result<()> { video_source.log_fetch_video_start(); let videos_model = filter_unfilled_videos(video_source.filter_expr(), connection).await?; - let tasks = stream::iter(videos_model) + let mut tasks = stream::iter(videos_model) .map(|video_model| async move { let video = Video::new(bili_client, video_model.bvid.as_str(), &config.credential); let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; @@ -136,9 +142,9 @@ pub async fn fetch_video_details( &video_model.bvid, &video_model.name, e ); if let Some(BiliError::ErrorResponse { code: -404, .. }) = e.downcast_ref::() { - let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into(); - video_active_model.valid = Set(false); - video_active_model.save(connection).await?; + Some(VideoDetailUpdate::Invalid(video_model.id)) + } else { + None } } Ok((tags, mut view_info)) => { @@ -157,16 +163,30 @@ pub async fn fetch_video_details( video_active_model.single_page = Set(Some(pages.len() == 1)); video_active_model.tags = Set(Some(tags.into())); video_active_model.should_download = Set(video_source.rule().evaluate(&video_active_model, &pages)); - let txn = connection.begin().await?; - create_pages(pages, &txn).await?; - video_active_model.save(&txn).await?; - txn.commit().await?; + Some(VideoDetailUpdate::Detail(pages, video_active_model)) } - }; - Ok::<_, anyhow::Error>(()) + } }) - .buffer_unordered(config.concurrent_limit.video); - tasks.try_collect::<()>().await?; + .buffer_unordered(config.concurrent_limit.video) + .filter_map(futures::future::ready) + .chunks(15); + while let Some(details) = tasks.next().await { + let mut invalid_video_ids = Vec::new(); + let mut pages = Vec::new(); + let mut videos = Vec::new(); + details.into_iter().for_each(|detail| match detail { + VideoDetailUpdate::Invalid(video_id) => invalid_video_ids.push(video_id), + VideoDetailUpdate::Detail(video_pages, video) => { + pages.extend(video_pages); + videos.push(video); + } + }); + let txn = connection.begin().await?; + update_video_detail_models(videos, &txn).await?; + set_video_models_invalid(invalid_video_ids, &txn).await?; + create_pages(pages, &txn).await?; + txn.commit().await?; + } video_source.log_fetch_video_end(); Ok(()) }