Files
bili-sync/processor.py

267 lines
8.8 KiB
Python

import asyncio
import datetime
from asyncio import Semaphore, create_subprocess_exec
from asyncio.subprocess import DEVNULL
from bilibili_api import favorite_list, video
from bilibili_api.exceptions import ResponseCodeException
from loguru import logger
from tortoise import Tortoise
from constants import FFMPEG_COMMAND, MediaStatus, MediaType
from credential import credential
from models import FavoriteItem, FavoriteList, Upper
from nfo import Actor, EpisodeInfo
from settings import settings
from utils import aexists, amakedirs, client, download_content
anchor = datetime.date.today()
async def cleanup() -> None:
await client.aclose()
await Tortoise.close_connections()
def concurrent_decorator(concurrency: int) -> callable:
sem = Semaphore(value=concurrency)
def decorator(func: callable) -> callable:
async def wrapper(*args, **kwargs) -> any:
async with sem:
return await func(*args, **kwargs)
return wrapper
return decorator
async def manage_model(medias: list[dict], fav_list: FavoriteList) -> None:
uppers = [
Upper(
mid=media["upper"]["mid"],
name=media["upper"]["name"],
thumb=media["upper"]["face"],
)
for media in medias
]
await Upper.bulk_create(
uppers, on_conflict=["mid"], update_fields=["name", "thumb"]
)
items = [
FavoriteItem(
name=media["title"],
type=media["type"],
bvid=media["bvid"],
desc=media["intro"],
cover=media["cover"],
favorite_list=fav_list,
upper_id=media["upper"]["mid"],
ctime=datetime.datetime.utcfromtimestamp(media["ctime"]),
pubtime=datetime.datetime.utcfromtimestamp(media["pubtime"]),
fav_time=datetime.datetime.utcfromtimestamp(media["fav_time"]),
downloaded=False,
)
for media in medias
]
await FavoriteItem.bulk_create(
items,
on_conflict=["bvid", "favorite_list_id"],
update_fields=[
"name",
"type",
"desc",
"cover",
"ctime",
"pubtime",
"fav_time",
],
)
async def process() -> None:
global anchor
if (
today := datetime.date.today()
) > anchor and await credential.check_refresh():
try:
await credential.refresh()
anchor = today
logger.info("Credential refreshed.")
except Exception:
logger.exception("Failed to refresh credential.")
return
for favorite_id in settings.favorite_ids:
if favorite_id not in settings.path_mapper:
logger.warning(
f"Favorite {favorite_id} not in path mapper, ignored."
)
continue
await process_favorite(favorite_id)
async def process_favorite(favorite_id: int) -> None:
# 预先请求第一页内容以获取收藏夹标题
favorite_video_list = await favorite_list.get_video_favorite_list_content(
favorite_id, page=1, credential=credential
)
title = favorite_video_list["info"]["title"]
logger.info(
"Start to process favorite {}: {}",
favorite_id,
title,
)
fav_list, _ = await FavoriteList.get_or_create(
id=favorite_id, defaults={"name": favorite_video_list["info"]["title"]}
)
fav_list.video_list_path.mkdir(parents=True, exist_ok=True)
page = 0
while True:
page += 1
if page > 1:
favorite_video_list = (
await favorite_list.get_video_favorite_list_content(
favorite_id, page=page, credential=credential
)
)
# 先看看对应 bvid 的记录是否存在
existed_items = await FavoriteItem.filter(
favorite_list=fav_list,
bvid__in=[media["bvid"] for media in favorite_video_list["medias"]],
)
# 记录一下获得的列表中的 bvid 和 fav_time
media_info = {
(media["bvid"], media["fav_time"])
for media in favorite_video_list["medias"]
}
# 如果有 bvid 和 fav_time 都相同的记录,说明已经到达了上次处理到的位置
continue_flag = not media_info & {
(item.bvid, int(item.fav_time.timestamp()))
for item in existed_items
}
await manage_model(favorite_video_list["medias"], fav_list)
if not (continue_flag and favorite_video_list["has_more"]):
break
all_unprocessed_items = await FavoriteItem.filter(
favorite_list=fav_list,
type=MediaType.VIDEO,
status=MediaStatus.NORMAL,
downloaded=False,
).prefetch_related("upper")
await asyncio.gather(
*[process_video(item) for item in all_unprocessed_items],
return_exceptions=True,
)
logger.info("Favorite {} {} processed successfully.", favorite_id, title)
@concurrent_decorator(4)
async def process_video(fav_item: FavoriteItem) -> None:
logger.info("Start to process video {} {}", fav_item.bvid, fav_item.name)
if fav_item.type != MediaType.VIDEO:
logger.warning("Media {} is not a video, skipped.", fav_item.name)
return
try:
if await aexists(fav_item.video_path):
fav_item.downloaded = True
await fav_item.save()
logger.info(
"{} {} already exists, skipped.", fav_item.bvid, fav_item.name
)
return
# 写入 up 主头像
if not all(
await asyncio.gather(
aexists(fav_item.upper.thumb_path),
aexists(fav_item.upper.meta_path),
)
):
await amakedirs(fav_item.upper.thumb_path.parent, exist_ok=True)
await fav_item.upper.save_metadata()
await download_content(
fav_item.upper.thumb, fav_item.upper.thumb_path
)
# 写入 nfo
await EpisodeInfo(
title=fav_item.name,
plot=fav_item.desc,
actor=[
Actor(
name=fav_item.upper.mid,
role=fav_item.upper.name,
)
],
bvid=fav_item.bvid,
aired=fav_item.ctime,
).write_nfo(fav_item.nfo_path)
# 写入 poster
await download_content(fav_item.cover, fav_item.poster_path)
# 开始处理视频内容
v = video.Video(fav_item.bvid, credential=credential)
detector = video.VideoDownloadURLDataDetecter(
await v.get_download_url(page_index=0)
)
streams = detector.detect_best_streams()
if detector.check_flv_stream():
await download_content(streams[0].url, fav_item.tmp_video_path)
process = await create_subprocess_exec(
FFMPEG_COMMAND,
"-i",
str(fav_item.tmp_video_path),
str(fav_item.video_path),
stdout=DEVNULL,
stderr=DEVNULL,
)
await process.communicate()
fav_item.tmp_video_path.unlink()
else:
await asyncio.gather(
download_content(streams[0].url, fav_item.tmp_video_path),
download_content(streams[1].url, fav_item.tmp_audio_path),
)
process = await create_subprocess_exec(
FFMPEG_COMMAND,
"-i",
str(fav_item.tmp_video_path),
"-i",
str(fav_item.tmp_audio_path),
"-c",
"copy",
str(fav_item.video_path),
stdout=DEVNULL,
stderr=DEVNULL,
)
await process.communicate()
fav_item.tmp_video_path.unlink()
fav_item.tmp_audio_path.unlink()
fav_item.downloaded = True
await fav_item.save()
logger.info(
"{} {} processed successfully.", fav_item.bvid, fav_item.name
)
except ResponseCodeException as e:
match e.code:
case 62002:
fav_item.status = MediaStatus.INVISIBLE
case -404:
fav_item.status = MediaStatus.DELETED
case _:
logger.exception(
"Failed to process video {} {}, error_code: {}",
fav_item.bvid,
fav_item.name,
e.code,
)
return
await fav_item.save()
logger.error(
"Video {} {} is not available, marked as {}",
fav_item.bvid,
fav_item.name,
fav_item.status.text,
)
except Exception:
logger.exception(
"Failed to process video {} {}", fav_item.bvid, fav_item.name
)