perf: 批量填充视频详情,避免过多事务 (#725)

This commit is contained in:
ᴀᴍᴛᴏᴀᴇʀ
2026-06-24 01:22:38 +08:00
committed by GitHub
parent de335befaf
commit bf762b1b0a
2 changed files with 132 additions and 19 deletions

View File

@@ -1,10 +1,11 @@
use anyhow::{Context, Result, anyhow};
use bili_sync_entity::*;
use itertools::Itertools;
use rand::seq::SliceRandom;
use sea_orm::ActiveValue::Set;
use sea_orm::DatabaseTransaction;
use sea_orm::entity::prelude::*;
use sea_orm::sea_query::{OnConflict, SimpleExpr};
use sea_orm::sea_query::{Expr, OnConflict, SimpleExpr};
use sea_orm::{ConnectionTrait, DatabaseTransaction, IdenStatic, Statement};
use crate::adapter::{VideoSource, VideoSourceEnum};
use crate::bilibili::VideoInfo;
@@ -76,8 +77,16 @@ pub async fn create_videos(
/// 尝试创建 Page Model如果发生冲突则忽略
pub async fn create_pages(pages_model: Vec<page::ActiveModel>, connection: &DatabaseTransaction) -> Result<()> {
for page_chunk in pages_model.chunks(200) {
page::Entity::insert_many(page_chunk.to_vec())
let mut pages = pages_model.into_iter();
loop {
// 这里 insert_many 要求 IntoIteratorvec 上调用 chunks 返回的类型不匹配,需要 to_vec 做 clone
// itertools 的 into_iter().chunks() 由于 !Send 也无法直接使用
// 暂时手写 take + collect 作为避免 clone 的折中方案
let page_chunk = pages.by_ref().take(200).collect::<Vec<_>>();
if page_chunk.is_empty() {
break;
}
page::Entity::insert_many(page_chunk)
.on_conflict(
OnConflict::columns([page::Column::VideoId, page::Column::Pid])
.do_nothing()
@@ -90,6 +99,90 @@ pub async fn create_pages(pages_model: Vec<page::ActiveModel>, connection: &Data
Ok(())
}
/// 更新视频 model 的详情字段
pub async fn update_video_detail_models(
videos: Vec<video::ActiveModel>,
connection: &DatabaseTransaction,
) -> Result<()> {
if videos.is_empty() {
return Ok(());
}
let columns = [
video::Column::Id,
video::Column::CollectionId,
video::Column::FavoriteId,
video::Column::WatchLaterId,
video::Column::SubmissionId,
video::Column::UpperId,
video::Column::UpperName,
video::Column::UpperFace,
video::Column::Staff,
video::Column::Name,
video::Column::Bvid,
video::Column::Intro,
video::Column::Cover,
video::Column::Ctime,
video::Column::Pubtime,
video::Column::Favtime,
video::Column::DownloadStatus,
video::Column::Valid,
video::Column::ShouldDownload,
video::Column::Tags,
video::Column::SinglePage,
];
let row = format!("({})", std::iter::repeat_n("?", columns.len()).join(", "));
let rows = std::iter::repeat_n(row.as_str(), videos.len()).join(", ");
let mut values = Vec::with_capacity(videos.len() * columns.len());
for video in videos {
for column in columns {
values.push(
video
.get(column)
.into_value()
.ok_or_else(|| anyhow!("video column {} is not set", column.as_str()))?,
);
}
}
let sql = format!(
"WITH tempdata({}) AS (VALUES {}) \
UPDATE video \
SET {} \
FROM tempdata \
WHERE video.id = tempdata.id",
columns.iter().map(IdenStatic::as_str).join(", "),
rows,
columns
.iter()
.skip(1)
.map(|column| {
let column = column.as_str();
format!("{} = tempdata.{}", column, column)
})
.join(", ")
);
connection
.execute(Statement::from_sql_and_values(
connection.get_database_backend(),
sql,
values,
))
.await?;
Ok(())
}
/// 将视频标记为失效
pub async fn set_video_models_invalid(video_ids: Vec<i32>, connection: &DatabaseTransaction) -> Result<()> {
if video_ids.is_empty() {
return Ok(());
}
video::Entity::update_many()
.filter(video::Column::Id.is_in(video_ids))
.col_expr(video::Column::Valid, Expr::value(false))
.exec(connection)
.await?;
Ok(())
}
/// 更新视频 model 的下载状态
pub async fn update_videos_model(videos: Vec<video::ActiveModel>, connection: &DatabaseConnection) -> Result<()> {
video::Entity::insert_many(videos)

View File

@@ -21,14 +21,20 @@ use crate::notifier::DownloadNotifyInfo;
use crate::utils::download_context::DownloadContext;
use crate::utils::format_arg::{page_format_args, video_format_args};
use crate::utils::model::{
create_pages, create_videos, filter_unfilled_videos, filter_unhandled_video_pages, update_pages_model,
update_videos_model,
create_pages, create_videos, filter_unfilled_videos, filter_unhandled_video_pages, set_video_models_invalid,
update_pages_model, update_video_detail_models, update_videos_model,
};
use crate::utils::nfo::{NFO, ToNFO};
use crate::utils::notify::notify;
use crate::utils::rule::FieldEvaluatable;
use crate::utils::status::{PageStatus, STATUS_OK, VideoStatus};
#[allow(clippy::large_enum_variant)]
enum VideoDetailUpdate {
Invalid(i32),
Detail(Vec<page::ActiveModel>, video::ActiveModel),
}
/// 完整地处理某个视频来源
pub async fn process_video_source(
video_source: VideoSourceEnum,
@@ -98,7 +104,7 @@ pub async fn refresh_video_source<'a>(
}
})
.filter_map(|(idx, res)| futures::future::ready(video_source.should_filter(idx, res, &latest_row_at)))
.chunks(10);
.chunks(30);
let mut count = 0;
while let Some(videos_info) = video_streams.next().await {
count += videos_info.len();
@@ -125,7 +131,7 @@ pub async fn fetch_video_details(
) -> Result<()> {
video_source.log_fetch_video_start();
let videos_model = filter_unfilled_videos(video_source.filter_expr(), connection).await?;
let tasks = stream::iter(videos_model)
let mut tasks = stream::iter(videos_model)
.map(|video_model| async move {
let video = Video::new(bili_client, video_model.bvid.as_str(), &config.credential);
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await;
@@ -136,9 +142,9 @@ pub async fn fetch_video_details(
&video_model.bvid, &video_model.name, e
);
if let Some(BiliError::ErrorResponse { code: -404, .. }) = e.downcast_ref::<BiliError>() {
let mut video_active_model: bili_sync_entity::video::ActiveModel = video_model.into();
video_active_model.valid = Set(false);
video_active_model.save(connection).await?;
Some(VideoDetailUpdate::Invalid(video_model.id))
} else {
None
}
}
Ok((tags, mut view_info)) => {
@@ -157,16 +163,30 @@ pub async fn fetch_video_details(
video_active_model.single_page = Set(Some(pages.len() == 1));
video_active_model.tags = Set(Some(tags.into()));
video_active_model.should_download = Set(video_source.rule().evaluate(&video_active_model, &pages));
let txn = connection.begin().await?;
create_pages(pages, &txn).await?;
video_active_model.save(&txn).await?;
txn.commit().await?;
Some(VideoDetailUpdate::Detail(pages, video_active_model))
}
};
Ok::<_, anyhow::Error>(())
}
})
.buffer_unordered(config.concurrent_limit.video);
tasks.try_collect::<()>().await?;
.buffer_unordered(config.concurrent_limit.video)
.filter_map(futures::future::ready)
.chunks(15);
while let Some(details) = tasks.next().await {
let mut invalid_video_ids = Vec::new();
let mut pages = Vec::new();
let mut videos = Vec::new();
details.into_iter().for_each(|detail| match detail {
VideoDetailUpdate::Invalid(video_id) => invalid_video_ids.push(video_id),
VideoDetailUpdate::Detail(video_pages, video) => {
pages.extend(video_pages);
videos.push(video);
}
});
let txn = connection.begin().await?;
update_video_detail_models(videos, &txn).await?;
set_video_models_invalid(invalid_video_ids, &txn).await?;
create_pages(pages, &txn).await?;
txn.commit().await?;
}
video_source.log_fetch_video_end();
Ok(())
}