mirror of
https://github.com/amtoaer/bili-sync.git
synced 2026-06-27 18:32:43 +08:00
feat: 接入数据库,功能基本完成
This commit is contained in:
207
processor.py
207
processor.py
@@ -8,9 +8,11 @@ import aiofiles
|
||||
import httpx
|
||||
from bilibili_api import HEADERS, favorite_list, video
|
||||
from loguru import logger
|
||||
from tortoise import Tortoise
|
||||
|
||||
from constants import FFMPEG_COMMAND, MediaType
|
||||
from constants import DEFAULT_THUMB_PATH, FFMPEG_COMMAND, MediaType
|
||||
from credential import credential
|
||||
from models import FavoriteItem, FavoriteList, Upper
|
||||
from nfo import Actor, EpisodeInfo
|
||||
from settings import settings
|
||||
|
||||
@@ -21,6 +23,7 @@ client = httpx.AsyncClient(headers=HEADERS)
|
||||
|
||||
async def cleanup() -> None:
|
||||
await client.aclose()
|
||||
await Tortoise.close_connections()
|
||||
|
||||
|
||||
def concurrent_decorator(concurrency: int) -> callable:
|
||||
@@ -36,7 +39,7 @@ def concurrent_decorator(concurrency: int) -> callable:
|
||||
return decorator
|
||||
|
||||
|
||||
async def download_content(url: str, path: Path):
|
||||
async def download_content(url: str, path: Path) -> None:
|
||||
async with client.stream("GET", url) as resp, aiofiles.open(
|
||||
path, "wb"
|
||||
) as f:
|
||||
@@ -46,11 +49,53 @@ async def download_content(url: str, path: Path):
|
||||
await f.write(chunk)
|
||||
|
||||
|
||||
async def process():
|
||||
async def manage_model(medias: list[dict], fav_list: FavoriteList) -> None:
|
||||
uppers = [
|
||||
Upper(
|
||||
mid=media["upper"]["mid"],
|
||||
name=media["upper"]["name"],
|
||||
thumb=media["upper"]["face"],
|
||||
)
|
||||
for media in medias
|
||||
]
|
||||
await Upper.bulk_create(
|
||||
uppers, on_conflict=["mid"], update_fields=["name", "thumb"]
|
||||
)
|
||||
items = [
|
||||
FavoriteItem(
|
||||
name=media["title"],
|
||||
type=media["type"],
|
||||
bvid=media["bvid"],
|
||||
desc=media["intro"],
|
||||
cover=media["cover"],
|
||||
favorite_list=fav_list,
|
||||
upper_id=media["upper"]["mid"],
|
||||
ctime=datetime.datetime.utcfromtimestamp(media["ctime"]),
|
||||
pubtime=datetime.datetime.utcfromtimestamp(media["pubtime"]),
|
||||
fav_time=datetime.datetime.utcfromtimestamp(media["fav_time"]),
|
||||
downloaded=False,
|
||||
)
|
||||
for media in medias
|
||||
]
|
||||
await FavoriteItem.bulk_create(
|
||||
items,
|
||||
on_conflict=["bvid", "favorite_list_id"],
|
||||
update_fields=[
|
||||
"name",
|
||||
"type",
|
||||
"desc",
|
||||
"cover",
|
||||
"ctime",
|
||||
"pubtime",
|
||||
"fav_time",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
async def process() -> None:
|
||||
global anchor
|
||||
if (
|
||||
datetime.datetime.now() - anchor
|
||||
).days >= 1 and await credential.check_refresh():
|
||||
if (datetime.datetime.now() - anchor).days >= 3:
|
||||
# 暂定三天刷新一次凭据,具体看情况调整
|
||||
try:
|
||||
credential.refresh()
|
||||
anchor = datetime.datetime.today()
|
||||
@@ -68,98 +113,138 @@ async def process():
|
||||
|
||||
|
||||
async def process_favorite(favorite_id: int) -> None:
|
||||
save_path = Path(settings.path_mapper[favorite_id])
|
||||
save_path.mkdir(parents=True, exist_ok=True)
|
||||
page, tasks = 0, []
|
||||
# 预先请求第一页内容以获取收藏夹标题
|
||||
favorite_video_list = await favorite_list.get_video_favorite_list_content(
|
||||
favorite_id, page=1, credential=credential
|
||||
)
|
||||
logger.info(
|
||||
"start to process favorite {}: {}",
|
||||
favorite_id,
|
||||
favorite_video_list["info"]["title"],
|
||||
)
|
||||
fav_list, _ = await FavoriteList.get_or_create(
|
||||
id=favorite_id, defaults={"name": favorite_video_list["info"]["title"]}
|
||||
)
|
||||
fav_list.video_list_path.mkdir(parents=True, exist_ok=True)
|
||||
DEFAULT_THUMB_PATH.mkdir(parents=True, exist_ok=True)
|
||||
page = 0
|
||||
while True:
|
||||
page += 1
|
||||
favorite_video_list = (
|
||||
await favorite_list.get_video_favorite_list_content(
|
||||
favorite_id, page=page, credential=credential
|
||||
if page > 1:
|
||||
favorite_video_list = (
|
||||
await favorite_list.get_video_favorite_list_content(
|
||||
favorite_id, page=page, credential=credential
|
||||
)
|
||||
)
|
||||
# 先看看对应 bvid 的记录是否存在
|
||||
existed_items = await FavoriteItem.filter(
|
||||
favorite_list=fav_list,
|
||||
bvid__in=[media["bvid"] for media in favorite_video_list["medias"]],
|
||||
)
|
||||
if page == 1:
|
||||
logger.info(
|
||||
"start to process favorite {}: {}",
|
||||
favorite_id,
|
||||
favorite_video_list["info"]["title"],
|
||||
)
|
||||
tasks.extend(
|
||||
[
|
||||
process_video(save_path, media)
|
||||
for media in favorite_video_list["medias"]
|
||||
]
|
||||
logger.info("len of existed_items: {}", len(existed_items))
|
||||
# 记录一下获得的列表中的 bvid 和 fav_time
|
||||
media_info = {
|
||||
(media["bvid"], media["fav_time"])
|
||||
for media in favorite_video_list["medias"]
|
||||
}
|
||||
logger.error("media_info: {}", media_info)
|
||||
logger.error(
|
||||
"existed_items: {}",
|
||||
{
|
||||
(item.bvid, int(item.fav_time.timestamp()))
|
||||
for item in existed_items
|
||||
},
|
||||
)
|
||||
if not favorite_video_list["has_more"]:
|
||||
# 如果有 bvid 和 fav_time 都相同的记录,说明已经到达了上次处理到的位置
|
||||
continue_flag = not media_info & {
|
||||
(item.bvid, int(item.fav_time.timestamp()))
|
||||
for item in existed_items
|
||||
}
|
||||
if not continue_flag:
|
||||
logger.info("已经处理到上次的位置,跳过。")
|
||||
await manage_model(favorite_video_list["medias"], fav_list)
|
||||
if not (continue_flag and favorite_video_list["has_more"]):
|
||||
break
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
all_unprocessed_items = await FavoriteItem.filter(
|
||||
favorite_list=fav_list, downloaded=False
|
||||
).prefetch_related("upper")
|
||||
result = await asyncio.gather(
|
||||
*[process_video(item) for item in list(all_unprocessed_items)[:5]],
|
||||
return_exceptions=True,
|
||||
)
|
||||
logger.error(result)
|
||||
|
||||
|
||||
@concurrent_decorator(4)
|
||||
async def process_video(save_path: Path, media: dict) -> None:
|
||||
title = media["title"]
|
||||
safe_title = media["title"].replace("/", "_")
|
||||
logger.info("start to process video {}", title)
|
||||
if media["type"] != MediaType.VIDEO:
|
||||
logger.warning("Media {} is not a video, skipped.", title)
|
||||
async def process_video(fav_item: FavoriteItem) -> None:
|
||||
logger.info("start to process video {}", fav_item.name)
|
||||
if fav_item.type != MediaType.VIDEO:
|
||||
logger.warning("Media {} is not a video, skipped.", fav_item.name)
|
||||
return
|
||||
final_path = save_path / f"{safe_title}.mp4"
|
||||
if final_path.exists():
|
||||
logger.info(f"{final_path} already exists, skipped.")
|
||||
if fav_item.video_path.exists():
|
||||
fav_item.downloaded = True
|
||||
await fav_item.save()
|
||||
logger.info(
|
||||
"{} {} already exists, skipped.", fav_item.bvid, fav_item.name
|
||||
)
|
||||
return
|
||||
# 写入 up 主头像
|
||||
if not fav_item.upper.thumb_path.exists():
|
||||
await download_content(fav_item.upper.thumb, fav_item.upper.thumb_path)
|
||||
# 写入 nfo
|
||||
nfo_path = save_path / f"{safe_title}.nfo"
|
||||
EpisodeInfo(
|
||||
title=title,
|
||||
plot=media["intro"],
|
||||
actor=[Actor(f"{media['upper']['mid']} - {media['upper']['name']}")],
|
||||
bvid=media["bvid"],
|
||||
aired=datetime.datetime.fromtimestamp(media["ctime"]),
|
||||
).write_nfo(nfo_path)
|
||||
title=fav_item.name,
|
||||
plot=fav_item.desc,
|
||||
actor=[
|
||||
Actor(
|
||||
name=fav_item.upper.mid,
|
||||
role=fav_item.upper.name,
|
||||
thumb=fav_item.upper.thumb_path,
|
||||
)
|
||||
],
|
||||
bvid=fav_item.bvid,
|
||||
aired=fav_item.ctime,
|
||||
).write_nfo(fav_item.nfo_path)
|
||||
# 写入 poster
|
||||
cover_path = save_path / f"{safe_title}-poster.jpg"
|
||||
await download_content(media["cover"], cover_path)
|
||||
await download_content(fav_item.cover, fav_item.poster_path)
|
||||
# 开始处理视频内容
|
||||
v = video.Video(media["bvid"], credential=credential)
|
||||
v = video.Video(fav_item.bvid, credential=credential)
|
||||
detector = video.VideoDownloadURLDataDetecter(
|
||||
await v.get_download_url(page_index=0)
|
||||
)
|
||||
streams = detector.detect_best_streams()
|
||||
if detector.check_flv_stream():
|
||||
tmp_path = save_path / f"{safe_title}.flv"
|
||||
await download_content(streams[0].url, tmp_path)
|
||||
await download_content(streams[0].url, fav_item.tmp_video_path)
|
||||
process = await create_subprocess_exec(
|
||||
FFMPEG_COMMAND,
|
||||
"-i",
|
||||
str(tmp_path),
|
||||
str(final_path),
|
||||
str(fav_item.tmp_video_path),
|
||||
str(fav_item.video_path),
|
||||
stdout=DEVNULL,
|
||||
stderr=DEVNULL,
|
||||
)
|
||||
await process.communicate()
|
||||
tmp_path.unlink()
|
||||
fav_item.tmp_video_path.unlink()
|
||||
else:
|
||||
tmp_video_path, tmp_audio_path = (
|
||||
save_path / f"{safe_title}_video.m4s",
|
||||
save_path / f"{safe_title}_audio.m4s",
|
||||
)
|
||||
await asyncio.gather(
|
||||
download_content(streams[0].url, tmp_video_path),
|
||||
download_content(streams[1].url, tmp_audio_path),
|
||||
download_content(streams[0].url, fav_item.tmp_video_path),
|
||||
download_content(streams[1].url, fav_item.tmp_audio_path),
|
||||
)
|
||||
process = await create_subprocess_exec(
|
||||
FFMPEG_COMMAND,
|
||||
"-i",
|
||||
str(tmp_video_path),
|
||||
str(fav_item.tmp_video_path),
|
||||
"-i",
|
||||
str(tmp_audio_path),
|
||||
str(fav_item.tmp_audio_path),
|
||||
"-c",
|
||||
"copy",
|
||||
str(final_path),
|
||||
str(fav_item.video_path),
|
||||
stdout=DEVNULL,
|
||||
stderr=DEVNULL,
|
||||
)
|
||||
await process.communicate()
|
||||
tmp_video_path.unlink()
|
||||
tmp_audio_path.unlink()
|
||||
logger.info(f"{title} downloaded successfully.")
|
||||
fav_item.tmp_video_path.unlink()
|
||||
fav_item.tmp_audio_path.unlink()
|
||||
fav_item.downloaded = True
|
||||
await fav_item.save()
|
||||
logger.info("{} {} processed successfully.", fav_item.bvid, fav_item.name)
|
||||
|
||||
Reference in New Issue
Block a user