diff --git a/crates/bili_sync/src/workflow.rs b/crates/bili_sync/src/workflow.rs index bd44303..14f502e 100644 --- a/crates/bili_sync/src/workflow.rs +++ b/crates/bili_sync/src/workflow.rs @@ -5,13 +5,12 @@ use std::pin::Pin; use anyhow::{Context, Result, anyhow, bail}; use bili_sync_entity::upper_vec::Upper; use bili_sync_entity::*; -use futures::stream::FuturesUnordered; +use futures::stream::{self, FuturesUnordered}; use futures::{Stream, StreamExt, TryStreamExt}; use sea_orm::ActiveValue::Set; use sea_orm::TransactionTrait; use sea_orm::entity::prelude::*; use tokio::fs; -use tokio::sync::Semaphore; use crate::adapter::{VideoSource, VideoSourceEnum}; use crate::bilibili::{BestStream, BiliClient, BiliError, Dimension, PageInfo, Video, VideoInfo}; @@ -126,12 +125,8 @@ pub async fn fetch_video_details( ) -> Result<()> { video_source.log_fetch_video_start(); let videos_model = filter_unfilled_videos(video_source.filter_expr(), connection).await?; - let semaphore = Semaphore::new(config.concurrent_limit.video); - let semaphore_ref = &semaphore; - let tasks = videos_model - .into_iter() + let tasks = stream::iter(videos_model) .map(|video_model| async move { - let _permit = semaphore_ref.acquire().await.context("acquire semaphore failed")?; let video = Video::new(bili_client, video_model.bvid.as_str(), &config.credential); let info: Result<_> = async { Ok((video.get_tags().await?, video.get_view_info().await?)) }.await; match info { @@ -170,7 +165,7 @@ pub async fn fetch_video_details( }; Ok::<_, anyhow::Error>(()) }) - .collect::>(); + .buffer_unordered(config.concurrent_limit.video); tasks.try_collect::<()>().await?; video_source.log_fetch_video_end(); Ok(()) @@ -185,13 +180,11 @@ pub async fn download_unprocessed_videos( config: &Config, ) -> Result { video_source.log_download_video_start(); - let semaphore = Semaphore::new(config.concurrent_limit.video); let downloader = Downloader::new(bili_client.client.clone()); let cx = DownloadContext::new(bili_client, video_source, template, connection, &downloader, config); let unhandled_videos_pages = filter_unhandled_video_pages(video_source.filter_expr(), connection).await?; let mut assigned_upper_ids = HashSet::new(); - let tasks = unhandled_videos_pages - .into_iter() + let tasks = stream::iter(unhandled_videos_pages) .map(|(video_model, pages_model)| { // 这里按理说是可以直接拿到 assigned_uppers 的,但rust 会错误地认为它引用了 local variable // 导致编译出错,暂时先这样单独提取出一个 owned 的 upper id 列表,再在任务内部筛选 @@ -200,9 +193,9 @@ pub async fn download_unprocessed_videos( .map(|u| u.mid) .filter(|uid| assigned_upper_ids.insert(*uid)) .collect::>(); - download_video_pages(video_model, pages_model, &semaphore, task_uids, cx) + download_video_pages(video_model, pages_model, task_uids, cx) }) - .collect::>(); + .buffer_unordered(config.concurrent_limit.video); let mut risk_control_related_error = None; let mut stream = tasks // 触发风控时设置 download_aborted 标记并终止流 @@ -234,11 +227,9 @@ pub async fn download_unprocessed_videos( pub async fn download_video_pages( video_model: video::Model, page_models: Vec, - semaphore: &Semaphore, upper_uids: Vec, cx: DownloadContext<'_>, ) -> Result { - let _permit = semaphore.acquire().await.context("acquire semaphore failed")?; let mut status = VideoStatus::from(video_model.download_status); let separate_status = status.should_run(); // 未记录路径时填充,已经填充过路径时使用现有的 @@ -350,11 +341,9 @@ pub async fn dispatch_download_page( if !should_run { return Ok(ExecutionStatus::Skipped); } - let child_semaphore = Semaphore::new(cx.config.concurrent_limit.page); - let tasks = page_models - .into_iter() - .map(|page_model| download_page(video_model, page_model, &child_semaphore, base_path, cx)) - .collect::>(); + let tasks = stream::iter(page_models) + .map(|page_model| download_page(video_model, page_model, base_path, cx)) + .buffer_unordered(cx.config.concurrent_limit.page); let (mut risk_control_related_error, mut target_status) = (None, STATUS_OK); let mut stream = tasks .take_while(|res| { @@ -397,11 +386,9 @@ pub async fn dispatch_download_page( pub async fn download_page( video_model: &video::Model, page_model: page::Model, - semaphore: &Semaphore, base_path: &Path, cx: DownloadContext<'_>, ) -> Result { - let _permit = semaphore.acquire().await.context("acquire semaphore failed")?; let mut status = PageStatus::from(page_model.download_status); let separate_status = status.should_run(); let is_single_page = video_model.single_page.context("single_page is null")?;