mirror of
https://github.com/amtoaer/bili-sync.git
synced 2026-06-27 18:32:43 +08:00
feat: 加入精妙的并发数量限制
This commit is contained in:
40
processor.py
40
processor.py
@@ -1,6 +1,6 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
from asyncio import create_subprocess_exec
|
||||
from asyncio import Semaphore, create_subprocess_exec
|
||||
from asyncio.subprocess import DEVNULL
|
||||
from pathlib import Path
|
||||
|
||||
@@ -16,6 +16,19 @@ from settings import settings
|
||||
anchor = datetime.datetime.today()
|
||||
|
||||
|
||||
def concurrent_decorator(concurrency: int) -> callable:
|
||||
sem = Semaphore(value=concurrency)
|
||||
|
||||
def decorator(func: callable) -> callable:
|
||||
async def wrapper(*args, **kwargs) -> any:
|
||||
async with sem:
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
async def download_content(url: str, path: Path):
|
||||
async with httpx.AsyncClient(headers=HEADERS) as sess:
|
||||
resp = await sess.get(url)
|
||||
@@ -50,8 +63,9 @@ async def process():
|
||||
async def process_favorite(favorite_id: int) -> None:
|
||||
save_path = Path(settings.path_mapper[favorite_id])
|
||||
save_path.mkdir(parents=True, exist_ok=True)
|
||||
page = 1
|
||||
page, tasks = 0, []
|
||||
while True:
|
||||
page += 1
|
||||
favorite_video_list = (
|
||||
await favorite_list.get_video_favorite_list_content(
|
||||
favorite_id, page=page, credential=credential
|
||||
@@ -63,22 +77,18 @@ async def process_favorite(favorite_id: int) -> None:
|
||||
favorite_id,
|
||||
favorite_video_list["info"]["title"],
|
||||
)
|
||||
for i in range(0, len(favorite_video_list["medias"]), 4):
|
||||
medias = favorite_video_list["medias"][i : i + 4]
|
||||
tasks = [process_video(save_path, media) for media in medias]
|
||||
video_result = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
for idx, result in enumerate(video_result):
|
||||
if isinstance(result, Exception):
|
||||
logger.error(
|
||||
"Failed to process video {}: {}",
|
||||
medias[idx]["title"],
|
||||
result,
|
||||
)
|
||||
tasks.extend(
|
||||
[
|
||||
process_video(save_path, media)
|
||||
for media in favorite_video_list["medias"]
|
||||
]
|
||||
)
|
||||
if not favorite_video_list["has_more"]:
|
||||
return
|
||||
page += 1
|
||||
break
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
|
||||
@concurrent_decorator(4)
|
||||
async def process_video(save_path: Path, media: dict) -> None:
|
||||
title = media["title"]
|
||||
safe_title = media["title"].replace("/", "_")
|
||||
|
||||
Reference in New Issue
Block a user