perf: 将手动 semaphore 替换为 stream::buffer_unordered 以减少内存和调度开销 (#723)

This commit is contained in:
ᴀᴍᴛᴏᴀᴇʀ
2026-06-23 10:22:26 +08:00
committed by GitHub
parent b60577cbc2
commit de335befaf

View File

@@ -5,13 +5,12 @@ use std::pin::Pin;
use anyhow::{Context, Result, anyhow, bail};
use bili_sync_entity::upper_vec::Upper;
use bili_sync_entity::*;
use futures::stream::FuturesUnordered;
use futures::stream::{self, FuturesUnordered};
use futures::{Stream, StreamExt, TryStreamExt};
use sea_orm::ActiveValue::Set;
use sea_orm::TransactionTrait;
use sea_orm::entity::prelude::*;
use tokio::fs;
use tokio::sync::Semaphore;
use crate::adapter::{VideoSource, VideoSourceEnum};
use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Video, VideoInfo};
@@ -126,12 +125,8 @@ pub async fn fetch_video_details(
) -> Result<()> {
video_source.log_fetch_video_start();
let videos_model = filter_unfilled_videos(video_source.filter_expr(), connection).await?;
let semaphore = Semaphore::new(config.concurrent_limit.video);
let semaphore_ref = &semaphore;
let tasks = videos_model
.into_iter()
let tasks = stream::iter(videos_model)
.map(|video_model| async move {
let _permit = semaphore_ref.acquire().await.context("acquire semaphore failed")?;
let video = Video::new(bili_client, video_model.bvid.as_str(), &config.credential);
let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await;
match info {
@@ -170,7 +165,7 @@ pub async fn fetch_video_details(
};
Ok::<_, anyhow::Error>(())
})
.collect::<FuturesUnordered<_>>();
.buffer_unordered(config.concurrent_limit.video);
tasks.try_collect::<()>().await?;
video_source.log_fetch_video_end();
Ok(())
@@ -185,13 +180,11 @@ pub async fn download_unprocessed_videos(
config: &Config,
) -> Result<DownloadNotifyInfo> {
video_source.log_download_video_start();
let semaphore = Semaphore::new(config.concurrent_limit.video);
let downloader = Downloader::new(bili_client.client.clone());
let cx = DownloadContext::new(bili_client, video_source, template, connection, &downloader, config);
let unhandled_videos_pages = filter_unhandled_video_pages(video_source.filter_expr(), connection).await?;
let mut assigned_upper_ids = HashSet::new();
let tasks = unhandled_videos_pages
.into_iter()
let tasks = stream::iter(unhandled_videos_pages)
.map(|(video_model, pages_model)| {
// 这里按理说是可以直接拿到 assigned_uppers 的但rust 会错误地认为它引用了 local variable
// 导致编译出错,暂时先这样单独提取出一个 owned 的 upper id 列表,再在任务内部筛选
@@ -200,9 +193,9 @@ pub async fn download_unprocessed_videos(
.map(|u| u.mid)
.filter(|uid| assigned_upper_ids.insert(*uid))
.collect::<Vec<_>>();
download_video_pages(video_model, pages_model, &semaphore, task_uids, cx)
download_video_pages(video_model, pages_model, task_uids, cx)
})
.collect::<FuturesUnordered<_>>();
.buffer_unordered(config.concurrent_limit.video);
let mut risk_control_related_error = None;
let mut stream = tasks
// 触发风控时设置 download_aborted 标记并终止流
@@ -234,11 +227,9 @@ pub async fn download_unprocessed_videos(
pub async fn download_video_pages(
video_model: video::Model,
page_models: Vec<page::Model>,
semaphore: &Semaphore,
upper_uids: Vec<i64>,
cx: DownloadContext<'_>,
) -> Result<video::ActiveModel> {
let _permit = semaphore.acquire().await.context("acquire semaphore failed")?;
let mut status = VideoStatus::from(video_model.download_status);
let separate_status = status.should_run();
// 未记录路径时填充,已经填充过路径时使用现有的
@@ -350,11 +341,9 @@ pub async fn dispatch_download_page(
if !should_run {
return Ok(ExecutionStatus::Skipped);
}
let child_semaphore = Semaphore::new(cx.config.concurrent_limit.page);
let tasks = page_models
.into_iter()
.map(|page_model| download_page(video_model, page_model, &child_semaphore, base_path, cx))
.collect::<FuturesUnordered<_>>();
let tasks = stream::iter(page_models)
.map(|page_model| download_page(video_model, page_model, base_path, cx))
.buffer_unordered(cx.config.concurrent_limit.page);
let (mut risk_control_related_error, mut target_status) = (None, STATUS_OK);
let mut stream = tasks
.take_while(|res| {
@@ -397,11 +386,9 @@ pub async fn dispatch_download_page(
pub async fn download_page(
video_model: &video::Model,
page_model: page::Model,
semaphore: &Semaphore,
base_path: &Path,
cx: DownloadContext<'_>,
) -> Result<page::ActiveModel> {
let _permit = semaphore.acquire().await.context("acquire semaphore failed")?;
let mut status = PageStatus::from(page_model.download_status);
let separate_status = status.should_run();
let is_single_page = video_model.single_page.context("single_page is null")?;