From 6f8b4afa1c26b6276ca0c93638cc51194e1b7d2a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E1=B4=80=E1=B4=8D=E1=B4=9B=E1=B4=8F=E1=B4=80=E1=B4=87?= =?UTF-8?q?=CA=80?= Date: Wed, 22 Nov 2023 20:45:21 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E5=85=A8=E5=B9=B6?= =?UTF-8?q?=E8=A1=8C=E4=B8=8B=E8=BD=BD=20(#1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- processor.py | 143 +++++++++++++++++++++++++++------------------------ 1 file changed, 77 insertions(+), 66 deletions(-) diff --git a/processor.py b/processor.py index d661202..c9fdda6 100644 --- a/processor.py +++ b/processor.py @@ -7,6 +7,7 @@ import httpx from asyncio import create_subprocess_exec from asyncio.subprocess import DEVNULL from loguru import logger +import asyncio async def download_content(url: str, path: Path): @@ -20,74 +21,84 @@ async def download_content(url: str, path: Path): async def process(): + favorite_ids, tasks = [], [] + for favorite_id in settings.favorite_ids: if favorite_id not in settings.path_mapper: logger.warning(f"Favorite {favorite_id} not in path mapper, ignored.") continue - try: - save_path = Path(settings.path_mapper[favorite_id]) - save_path.mkdir(parents=True, exist_ok=True) - favorite_video_list = await favorite_list.get_video_favorite_list_content( - favorite_id, credential=credential - ) - logger.info( - "start to process favorite {}", favorite_video_list["info"]["title"] - ) - # TODO: - # 1. 添加进度条 - # 2. 翻页以下载全部内容 - # 3. 对接数据库 - # 4. 构建 nfo - for media in favorite_video_list["medias"][:8]: - title = media["title"] - final_path = save_path / f"{title}.mp4" - if final_path.exists(): - logger.info(f"{final_path} already exists, skipped.") - continue - v = video.Video(media["bvid"], credential=credential) - detector = video.VideoDownloadURLDataDetecter( - await v.get_download_url(page_index=0) - ) - streams = detector.detect_best_streams() - if detector.check_flv_stream(): - tmp_path = save_path / f"{title}.flv" - await download_content(streams[0].url, tmp_path) - process = await create_subprocess_exec( - FFMPEG_COMMAND, - "-i", - str(tmp_path), - str(final_path), - stdout=DEVNULL, - stderr=DEVNULL, - ) - await process.communicate() - tmp_path.unlink() - else: - tmp_video_path, tmp_audio_path = ( - save_path / f"{title}_video.m4s", - save_path / f"{title}_audio.m4s", - ) - await download_content(streams[0].url, tmp_video_path) - await download_content(streams[1].url, tmp_audio_path) - # 混流 - process = await create_subprocess_exec( - FFMPEG_COMMAND, - "-i", - str(tmp_video_path), - "-i", - str(tmp_audio_path), - "-c", - "copy", - str(final_path), - stdout=DEVNULL, - stderr=DEVNULL, - ) - await process.communicate() - tmp_video_path.unlink() - tmp_audio_path.unlink() - - logger.info(f"{final_path} downloaded successfully.") - - except Exception: - logger.exception(f"Failed to process favorite {favorite_id}") + favorite_ids.append(favorite_id) + tasks.append(process_favorite(favorite_id)) + favorite_result = await asyncio.gather(*tasks, return_exceptions=True) + for idx, result in enumerate(favorite_result): + if isinstance(result, Exception): + logger.error("Failed to process favorite {}: {}", favorite_ids[idx], result) continue + logger.info("Favorite {} processed successfully.", favorite_ids[idx]) + + +async def process_favorite(favorite_id: int) -> None: + save_path = Path(settings.path_mapper[favorite_id]) + save_path.mkdir(parents=True, exist_ok=True) + favorite_video_list = await favorite_list.get_video_favorite_list_content( + favorite_id, credential=credential + ) + logger.info("start to process favorite {}", favorite_video_list["info"]["title"]) + medias = favorite_video_list["medias"][:12] + tasks = [process_video(save_path, media) for media in medias] + video_result = await asyncio.gather(*tasks, return_exceptions=True) + for idx, result in enumerate(video_result): + if isinstance(result, Exception): + logger.error("Failed to process video {}: {}", medias[idx]["title"], result) + + +async def process_video(save_path: Path, media: dict) -> None: + title = media["title"] + logger.info("start to process video {}", title) + final_path = save_path / f"{title}.mp4" + if final_path.exists(): + logger.info(f"{final_path} already exists, skipped.") + return + v = video.Video(media["bvid"], credential=credential) + detector = video.VideoDownloadURLDataDetecter( + await v.get_download_url(page_index=0) + ) + streams = detector.detect_best_streams() + if detector.check_flv_stream(): + tmp_path = save_path / f"{title}.flv" + await download_content(streams[0].url, tmp_path) + process = await create_subprocess_exec( + FFMPEG_COMMAND, + "-i", + str(tmp_path), + str(final_path), + stdout=DEVNULL, + stderr=DEVNULL, + ) + await process.communicate() + tmp_path.unlink() + else: + tmp_video_path, tmp_audio_path = ( + save_path / f"{title}_video.m4s", + save_path / f"{title}_audio.m4s", + ) + await asyncio.gather( + download_content(streams[0].url, tmp_video_path), + download_content(streams[1].url, tmp_audio_path), + ) + process = await create_subprocess_exec( + FFMPEG_COMMAND, + "-i", + str(tmp_video_path), + "-i", + str(tmp_audio_path), + "-c", + "copy", + str(final_path), + stdout=DEVNULL, + stderr=DEVNULL, + ) + await process.communicate() + tmp_video_path.unlink() + tmp_audio_path.unlink() + logger.info(f"{final_path} downloaded successfully.")