Compare commits

..

20 Commits
1.0.3 ... 1.0.9

Author SHA1 Message Date
amtoaer
46d1810e7c chore: 处理成功的日志往外提一级 2023-12-04 01:50:31 +08:00
amtoaer
89e2567fef feat: tag 获取失败不影响主流程 2023-12-04 01:35:29 +08:00
amtoaer
38caf1f0d6 fix: 修复运行错误 2023-12-04 01:02:04 +08:00
amtoaer
6877171f4d fix: 修复参数错误 2023-12-04 00:54:04 +08:00
amtoaer
29d06a040b fix: 注册命令 2023-12-04 00:50:22 +08:00
amtoaer
ceec5d6780 feat: 尝试支持视频标签 2023-12-04 00:39:42 +08:00
amtoaer
650498d4a1 fix: 每天应仅检查一次 credential 2023-12-03 23:14:15 +08:00
amtoaer
96ff84391d doc: 修复图片路径过长的问题 2023-12-02 01:29:33 +08:00
amtoaer
44e8a2c97d doc: 加入对额外命令的描述,添加图片 2023-12-02 01:26:55 +08:00
amtoaer
c3bfb3c2e5 doc: 已解决 up 头像问题,更新 README 2023-12-02 01:12:59 +08:00
amtoaer
ec91cbf3ed fix: 继续修复字段错误 2023-12-02 00:51:21 +08:00
amtoaer
f174a3b898 fix: 修复字段错误,凭据刷新从第二天开始 2023-12-02 00:46:21 +08:00
amtoaer
c8fca7fcca style: 格式化代码 2023-12-02 00:40:55 +08:00
amtoaer
6ef25d6409 fix: 修复类型错误,加入部分日志 2023-12-02 00:40:35 +08:00
amtoaer
f10fc9dd97 fix: 修复字段取值错误 2023-12-02 00:30:56 +08:00
amtoaer
d21f14d851 ci: 推送 main 时构建 debug 镜像 2023-12-02 00:30:47 +08:00
amtoaer
012b3f9f31 fix: 尝试修复 emby 头像路径,异步化所有文件操作 2023-12-02 00:26:19 +08:00
amtoaer
bbde9d6ba6 fix: 修复判断 2023-11-30 18:12:15 +08:00
amtoaer
e040ab2d75 chore: 尝试降低更新凭据频率,防止风控 2023-11-30 17:50:36 +08:00
amtoaer
ad977e41d4 fix: 修复异步函数没有 await 的问题 2023-11-28 13:55:12 +08:00
10 changed files with 345 additions and 135 deletions

View File

@@ -0,0 +1,29 @@
name: Docker Image CI (DEBUG)
on:
push:
branches:
- main
jobs:
build:
runs-on: ubuntu-latest
steps:
-
name: Checkout
uses: actions/checkout@v3
-
name: Login to DockerHub
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
-
name: Build and push images
uses: docker/build-push-action@v5
with:
context: .
file: ./Dockerfile
push: true
tags: |
${{ secrets.DOCKERHUB_USERNAME }}/bili-sync:debug

2
.gitignore vendored
View File

@@ -4,7 +4,7 @@ debug.py
videos videos
config.test.json config.test.json
database.test.db* database.test.db*
example.json example*.json
thumbs.test thumbs.test
config config
data data

View File

@@ -1,4 +1,6 @@
# bili-sync ![bili-sync](https://socialify.git.ci/amtoaer/bili-sync/image?description=1&font=KoHo&issues=1&language=1&logo=https%3A%2F%2Fs2.loli.net%2F2023%2F12%2F02%2F9EwT2yInOu1d3zm.png&name=1&owner=1&pattern=Signal&pulls=1&stargazers=1&theme=Light)
## 简介
为 NAS 用户编写的 BILIBILI 收藏夹同步工具,可方便导入 EMBY 等媒体库工具浏览。 为 NAS 用户编写的 BILIBILI 收藏夹同步工具,可方便导入 EMBY 等媒体库工具浏览。
@@ -37,15 +39,6 @@ class Config(DataClassJsonMixin):
即:我们可以通过运行一次程序,等程序写入初始配置并提示配置错误终止后编辑 `config.json` 文件,编辑后即可重新运行。 即:我们可以通过运行一次程序,等程序写入初始配置并提示配置错误终止后编辑 `config.json` 文件,编辑后即可重新运行。
## 关于 UP 头像
目前开放全局的环境变量 `THUMB_PATH` 作为 up 主头像的存储位置。
在下载某条视频时,如果 UP 的头像还不存在,就会将 UP 的头像下载至 `THUMB_PATH`,同时在视频的 NFO 文件中写入 UP 头像的绝对路径。
但实际测试下来EMBY 似乎无法正常读取 NFO 文件中的本地头像路径,待找到处理办法后再修复。
> 虽然但是,一个基本的逻辑是,如果期望 `bili-sync` 在 NFO 中写入的头像绝对路径能够被 EMBY 读取到,那么两个容器中头像的绝对路径必须完全相同。因此虽然头像还没办法正常加载,但为后续考虑,还是推荐将 THUMB_PATH 填写上,并确保该路径在 `bili-sync` 和 `emby` 两个容器中指向的是相同的文件夹(也就是把一个文件夹同时挂载到 `bili-sync` 和 `emby` 的 THUMB_PATH 下)。
## Docker 运行示例 ## Docker 运行示例
@@ -60,17 +53,16 @@ services:
- /home/amtoaer/Videos/Bilibilis/:/Videos/Bilibilis/ # 视频文件 - /home/amtoaer/Videos/Bilibilis/:/Videos/Bilibilis/ # 视频文件
- /home/amtoaer/.config/nas/bili-sync/config/:/app/config/ # 配置文件 - /home/amtoaer/.config/nas/bili-sync/config/:/app/config/ # 配置文件
- /home/amtoaer/.config/nas/bili-sync/data/:/app/data/ # 数据库 - /home/amtoaer/.config/nas/bili-sync/data/:/app/data/ # 数据库
# 注:如需在 emby 内查看 up 主头像,需要将 emby 的 metadata/people/ 配置目录挂载至容器的 /app/thumb/
- /home/amtoaer/.config/nas/emby/metadata/people/:/app/thumb/
environment: environment:
- THUMB_PATH=/Videos/Bilibilis/thumb/ # 将头像放到视频文件的 thumb 文件夹下 - TZ=Asia/Shanghai
restart: always restart: always
network_mode: bridge network_mode: bridge
hostname: bili-sync hostname: bili-sync
container_name: bili-sync container_name: bili-sync
logging: logging:
driver: "json-file" driver: "local"
options:
max-size: "30m"
``` ```
对应的配置文件: 对应的配置文件:
@@ -92,9 +84,19 @@ services:
} }
``` ```
## 目前的问题 ## 支持的额外命令
- [ ] 研究一下 NFO看看怎么正常读取本地的演员头像 为满足需要,该应用包含几个单独的命令,可在程序目录下使用 `python entry.py ${command name}` 运行。
1. `once`
处理收藏夹,和一般定时任务触发时执行的操作完全相同,但仅运行一次。
2. `recheck`
将本地不存在的视频文件标记成未下载,下次定时任务触发时将一并下载。
3. `upper_thumb`
手动触发全量下载 up 主头像,为使用老版本时下载的没有 up 头像的视频添加头像。
## 路线图 ## 路线图

View File

@@ -1,10 +1,11 @@
import asyncio import asyncio
from aiofiles.os import path
from loguru import logger from loguru import logger
from constants import MediaStatus, MediaType from constants import MediaStatus, MediaType
from models import FavoriteItem from models import FavoriteItem, Upper
from processor import download_content, process_video
from utils import aexists, amakedirs, aremove
async def recheck(): async def recheck():
@@ -14,9 +15,7 @@ async def recheck():
status=MediaStatus.NORMAL, status=MediaStatus.NORMAL,
downloaded=True, downloaded=True,
) )
exists = await asyncio.gather( exists = await asyncio.gather(*[aexists(item.video_path) for item in items])
*[path.exists(item.video_path) for item in items]
)
for item, exist in zip(items, exists): for item, exist in zip(items, exists):
if isinstance(exist, Exception): if isinstance(exist, Exception):
logger.error( logger.error(
@@ -36,3 +35,54 @@ async def recheck():
logger.info("Updating database...") logger.info("Updating database...")
await FavoriteItem.bulk_update(items, fields=["downloaded"]) await FavoriteItem.bulk_update(items, fields=["downloaded"])
logger.info("Database updated.") logger.info("Database updated.")
async def upper_thumb():
"""将up主的头像批量写入数据库从不支持up主头像的版本升级上来后需要手动调用一次"""
makedir_tasks = []
other_tasks = []
for upper in await Upper.all():
if all(
await asyncio.gather(
aexists(upper.thumb_path), aexists(upper.meta_path)
)
):
logger.info(
"Upper {} {} already exists, skipped.", upper.mid, upper.name
)
makedir_tasks.append(amakedirs(upper.thumb_path.parent, exist_ok=True))
logger.info("Saving metadata for upper {} {}...", upper.mid, upper.name)
other_tasks.extend(
[
upper.save_metadata(),
download_content(upper.thumb, upper.thumb_path),
]
)
await asyncio.gather(*makedir_tasks)
await asyncio.gather(*other_tasks)
logger.info("All done.")
async def refresh_tags():
"""刷新已存在的视频的标签,从不支持标签的版本升级上来后需要手动调用一次"""
items = await FavoriteItem.filter(
downloaded=True,
tags=None,
).prefetch_related("upper")
await asyncio.gather(
*[aremove(item.nfo_path) for item in items],
return_exceptions=True,
)
await asyncio.gather(
*[
process_video(
item,
process_poster=False,
process_video=False,
process_nfo=True,
process_upper=False,
)
for item in items
],
return_exceptions=True,
)

View File

@@ -4,7 +4,7 @@ import sys
import uvloop import uvloop
from loguru import logger from loguru import logger
from commands import recheck from commands import recheck, refresh_tags, upper_thumb
from models import init_model from models import init_model
from processor import cleanup, process from processor import cleanup, process
from settings import settings from settings import settings
@@ -14,16 +14,16 @@ asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
async def entry() -> None: async def entry() -> None:
await init_model() await init_model()
if any("once" in _ for _ in sys.argv): for command, func in [
# 单次运行 ("once", process),
logger.info("Running once...") ("recheck", recheck),
await process() ("upper_thumb", upper_thumb),
return ("refresh_tags", refresh_tags),
if any("recheck" in _ for _ in sys.argv): ]:
# 重新检查 if any(command in _ for _ in sys.argv):
logger.info("Rechecking...") logger.info("Running {}...", command)
await recheck() await func()
return return
logger.info("Running daemon...") logger.info("Running daemon...")
while True: while True:
await process() await process()

View File

@@ -0,0 +1,11 @@
from tortoise import BaseDBAsyncClient
async def upgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "favoriteitem" ADD "tags" JSON;"""
async def downgrade(db: BaseDBAsyncClient) -> str:
return """
ALTER TABLE "favoriteitem" DROP COLUMN "tags";"""

View File

@@ -13,6 +13,7 @@ from constants import (
MediaType, MediaType,
) )
from settings import settings from settings import settings
from utils import aopen
class FavoriteList(Model): class FavoriteList(Model):
@@ -39,7 +40,31 @@ class Upper(Model):
@property @property
def thumb_path(self) -> Path: def thumb_path(self) -> Path:
return DEFAULT_THUMB_PATH / f"{self.mid}.jpg" return (
DEFAULT_THUMB_PATH / str(self.mid)[0] / f"{self.mid}" / "folder.jpg"
)
@property
def meta_path(self) -> Path:
return (
DEFAULT_THUMB_PATH / str(self.mid)[0] / f"{self.mid}" / "person.nfo"
)
async def save_metadata(self):
async with aopen(self.meta_path, "w") as f:
await f.write(
f"""
<?xml version="1.0" encoding="utf-8" standalone="yes"?>
<person>
<plot />
<outline />
<lockdata>false</lockdata>
<dateadded>{self.created_at.strftime("%Y-%m-%d %H:%M:%S")}</dateadded>
<title>{self.mid}</title>
<sorttitle>{self.mid}</sorttitle>
</person>
""".strip()
)
class FavoriteItem(Model): class FavoriteItem(Model):
@@ -54,6 +79,7 @@ class FavoriteItem(Model):
bvid = fields.CharField(max_length=255) bvid = fields.CharField(max_length=255)
desc = fields.TextField() desc = fields.TextField()
cover = fields.TextField() cover = fields.TextField()
tags = fields.JSONField(null=True)
favorite_list = fields.ForeignKeyField( favorite_list = fields.ForeignKeyField(
"models.FavoriteList", related_name="items" "models.FavoriteList", related_name="items"
) )

17
nfo.py
View File

@@ -2,19 +2,19 @@ import datetime
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from utils import aopen
@dataclass @dataclass
class Actor: class Actor:
name: str name: str
role: str role: str
thumb: Path
def to_xml(self) -> str: def to_xml(self) -> str:
return f""" return f"""
<actor> <actor>
<name>{self.name}</name> <name>{self.name}</name>
<role>{self.role}</role> <role>{self.role}</role>
<thumb>{self.thumb.resolve()}</thumb>
</actor> </actor>
""".strip( """.strip(
"\n" "\n"
@@ -25,16 +25,22 @@ class Actor:
class EpisodeInfo: class EpisodeInfo:
title: str title: str
plot: str plot: str
tags: list[str]
actor: list[Actor] actor: list[Actor]
bvid: str bvid: str
aired: datetime.datetime aired: datetime.datetime
def write_nfo(self, path: Path) -> None: async def write_nfo(self, path: Path) -> None:
with path.open("w", encoding="utf-8") as f: async with aopen(path, "w", encoding="utf-8") as f:
f.write(self.to_xml()) await f.write(self.to_xml())
def to_xml(self) -> str: def to_xml(self) -> str:
actor = "\n".join(_.to_xml() for _ in self.actor) actor = "\n".join(_.to_xml() for _ in self.actor)
tags = (
"\n".join(f" <genre>{_}</genre>" for _ in self.tags)
if isinstance(self.tags, list)
else ""
)
return f""" return f"""
<?xml version="1.0" encoding="utf-8" standalone="yes"?> <?xml version="1.0" encoding="utf-8" standalone="yes"?>
<episodedetails> <episodedetails>
@@ -43,6 +49,7 @@ class EpisodeInfo:
<title>{self.title}</title> <title>{self.title}</title>
{actor} {actor}
<year>{self.aired.year}</year> <year>{self.aired.year}</year>
{tags}
<uniqueid type="bilibili">{self.bvid}</uniqueid> <uniqueid type="bilibili">{self.bvid}</uniqueid>
<aired>{self.aired.strftime("%Y-%m-%d")}</aired> <aired>{self.aired.strftime("%Y-%m-%d")}</aired>
</episodedetails> </episodedetails>

View File

@@ -2,11 +2,8 @@ import asyncio
import datetime import datetime
from asyncio import Semaphore, create_subprocess_exec from asyncio import Semaphore, create_subprocess_exec
from asyncio.subprocess import DEVNULL from asyncio.subprocess import DEVNULL
from pathlib import Path
import aiofiles from bilibili_api import favorite_list, video
import httpx
from bilibili_api import HEADERS, favorite_list, video
from bilibili_api.exceptions import ResponseCodeException from bilibili_api.exceptions import ResponseCodeException
from loguru import logger from loguru import logger
from tortoise import Tortoise from tortoise import Tortoise
@@ -16,8 +13,9 @@ from credential import credential
from models import FavoriteItem, FavoriteList, Upper from models import FavoriteItem, FavoriteList, Upper
from nfo import Actor, EpisodeInfo from nfo import Actor, EpisodeInfo
from settings import settings from settings import settings
from utils import aexists, amakedirs, client, download_content
client = httpx.AsyncClient(headers=HEADERS) anchor = datetime.date.today()
async def cleanup() -> None: async def cleanup() -> None:
@@ -38,16 +36,6 @@ def concurrent_decorator(concurrency: int) -> callable:
return decorator return decorator
async def download_content(url: str, path: Path) -> None:
async with client.stream("GET", url) as resp, aiofiles.open(
path, "wb"
) as f:
async for chunk in resp.aiter_bytes(40960):
if not chunk:
return
await f.write(chunk)
async def manage_model(medias: list[dict], fav_list: FavoriteList) -> None: async def manage_model(medias: list[dict], fav_list: FavoriteList) -> None:
uppers = [ uppers = [
Upper( Upper(
@@ -93,16 +81,16 @@ async def manage_model(medias: list[dict], fav_list: FavoriteList) -> None:
async def process() -> None: async def process() -> None:
global anchor global anchor
if not await credential.check_valid(): if (today := datetime.date.today()) > anchor:
logger.error("Credential is invalid, skipped.") anchor = today
return logger.info("Check credential.")
if await credential.check_refresh(): if await credential.check_refresh():
try: try:
credential.refresh() await credential.refresh()
logger.info("Credential refreshed.") logger.info("Credential refreshed.")
except Exception: except Exception:
logger.exception("Failed to refresh credential.") logger.exception("Failed to refresh credential.")
return return
for favorite_id in settings.favorite_ids: for favorite_id in settings.favorite_ids:
if favorite_id not in settings.path_mapper: if favorite_id not in settings.path_mapper:
logger.warning( logger.warning(
@@ -168,82 +156,141 @@ async def process_favorite(favorite_id: int) -> None:
@concurrent_decorator(4) @concurrent_decorator(4)
async def process_video(fav_item: FavoriteItem) -> None: async def process_video(
fav_item: FavoriteItem,
process_poster=True,
process_video=True,
process_nfo=True,
process_upper=True,
) -> None:
logger.info("Start to process video {} {}", fav_item.bvid, fav_item.name) logger.info("Start to process video {} {}", fav_item.bvid, fav_item.name)
if fav_item.type != MediaType.VIDEO: if fav_item.type != MediaType.VIDEO:
logger.warning("Media {} is not a video, skipped.", fav_item.name) logger.warning("Media {} is not a video, skipped.", fav_item.name)
return return
v = video.Video(fav_item.bvid, credential=credential)
try: try:
if fav_item.video_path.exists(): if process_upper:
fav_item.downloaded = True # 写入 up 主头像
await fav_item.save() if not all(
logger.info( await asyncio.gather(
"{} {} already exists, skipped.", fav_item.bvid, fav_item.name aexists(fav_item.upper.thumb_path),
) aexists(fav_item.upper.meta_path),
return
# 写入 up 主头像
if not fav_item.upper.thumb_path.exists():
await download_content(
fav_item.upper.thumb, fav_item.upper.thumb_path
)
# 写入 nfo
EpisodeInfo(
title=fav_item.name,
plot=fav_item.desc,
actor=[
Actor(
name=fav_item.upper.mid,
role=fav_item.upper.name,
thumb=fav_item.upper.thumb_path,
) )
], ):
bvid=fav_item.bvid, await amakedirs(fav_item.upper.thumb_path.parent, exist_ok=True)
aired=fav_item.ctime, await asyncio.gather(
).write_nfo(fav_item.nfo_path) fav_item.upper.save_metadata(),
# 写入 poster download_content(
await download_content(fav_item.cover, fav_item.poster_path) fav_item.upper.thumb, fav_item.upper.thumb_path
# 开始处理视频内容 ),
v = video.Video(fav_item.bvid, credential=credential) return_exceptions=True,
detector = video.VideoDownloadURLDataDetecter( )
await v.get_download_url(page_index=0) else:
) logger.info(
streams = detector.detect_best_streams() "Upper {} {} already exists, skipped.",
if detector.check_flv_stream(): fav_item.upper.mid,
await download_content(streams[0].url, fav_item.tmp_video_path) fav_item.upper.name,
process = await create_subprocess_exec( )
FFMPEG_COMMAND, if process_nfo:
"-i", if not await aexists(fav_item.nfo_path):
str(fav_item.tmp_video_path), if fav_item.tags is None:
str(fav_item.video_path), try:
stdout=DEVNULL, fav_item.tags = [
stderr=DEVNULL, _["tag_name"] for _ in await v.get_tags()
) ]
await process.communicate() except Exception:
fav_item.tmp_video_path.unlink() logger.exception(
else: "Failed to get tags of video {} {}",
await asyncio.gather( fav_item.bvid,
download_content(streams[0].url, fav_item.tmp_video_path), fav_item.name,
download_content(streams[1].url, fav_item.tmp_audio_path), )
) # 写入 nfo
process = await create_subprocess_exec( await EpisodeInfo(
FFMPEG_COMMAND, title=fav_item.name,
"-i", plot=fav_item.desc,
str(fav_item.tmp_video_path), actor=[
"-i", Actor(
str(fav_item.tmp_audio_path), name=fav_item.upper.mid,
"-c", role=fav_item.upper.name,
"copy", )
str(fav_item.video_path), ],
stdout=DEVNULL, tags=fav_item.tags,
stderr=DEVNULL, bvid=fav_item.bvid,
) aired=fav_item.ctime,
await process.communicate() ).write_nfo(fav_item.nfo_path)
fav_item.tmp_video_path.unlink() else:
fav_item.tmp_audio_path.unlink() logger.info(
fav_item.downloaded = True "NFO of {} {} already exists, skipped.",
await fav_item.save() fav_item.bvid,
fav_item.name,
)
if process_poster:
# 写入 poster
if not await aexists(fav_item.poster_path):
await download_content(fav_item.cover, fav_item.poster_path)
else:
logger.info(
"Poster of {} {} already exists, skipped.",
fav_item.bvid,
fav_item.name,
)
if process_video:
if await aexists(fav_item.video_path):
fav_item.downloaded = True
logger.info(
"Video {} {} already exists, skipped.",
fav_item.bvid,
fav_item.name,
)
else:
# 开始处理视频内容
detector = video.VideoDownloadURLDataDetecter(
await v.get_download_url(page_index=0)
)
streams = detector.detect_best_streams()
if detector.check_flv_stream():
await download_content(
streams[0].url, fav_item.tmp_video_path
)
process = await create_subprocess_exec(
FFMPEG_COMMAND,
"-i",
str(fav_item.tmp_video_path),
str(fav_item.video_path),
stdout=DEVNULL,
stderr=DEVNULL,
)
await process.communicate()
fav_item.tmp_video_path.unlink()
else:
await asyncio.gather(
download_content(
streams[0].url, fav_item.tmp_video_path
),
download_content(
streams[1].url, fav_item.tmp_audio_path
),
)
process = await create_subprocess_exec(
FFMPEG_COMMAND,
"-i",
str(fav_item.tmp_video_path),
"-i",
str(fav_item.tmp_audio_path),
"-c",
"copy",
str(fav_item.video_path),
stdout=DEVNULL,
stderr=DEVNULL,
)
await process.communicate()
fav_item.tmp_video_path.unlink()
fav_item.tmp_audio_path.unlink()
fav_item.downloaded = True
logger.info( logger.info(
"{} {} processed successfully.", fav_item.bvid, fav_item.name "{} {} processed successfully.",
fav_item.bvid,
fav_item.name,
) )
except ResponseCodeException as e: except ResponseCodeException as e:
match e.code: match e.code:
@@ -259,7 +306,6 @@ async def process_video(fav_item: FavoriteItem) -> None:
e.code, e.code,
) )
return return
await fav_item.save()
logger.error( logger.error(
"Video {} {} is not available, marked as {}", "Video {} {} is not available, marked as {}",
fav_item.bvid, fav_item.bvid,
@@ -270,3 +316,5 @@ async def process_video(fav_item: FavoriteItem) -> None:
logger.exception( logger.exception(
"Failed to process video {} {}", fav_item.bvid, fav_item.name "Failed to process video {} {}", fav_item.bvid, fav_item.name
) )
finally:
await fav_item.save()

37
utils.py Normal file
View File

@@ -0,0 +1,37 @@
from pathlib import Path
import aiofiles
import httpx
from aiofiles.base import AiofilesContextManager
from aiofiles.os import makedirs, remove
from aiofiles.ospath import exists
from aiofiles.threadpool.text import AsyncTextIOWrapper
from bilibili_api import HEADERS
client = httpx.AsyncClient(headers=HEADERS)
async def download_content(url: str, path: Path) -> None:
async with client.stream("GET", url) as resp, aopen(path, "wb") as f:
async for chunk in resp.aiter_bytes(40960):
if not chunk:
return
await f.write(chunk)
async def aexists(path: Path) -> bool:
return await exists(path)
async def amakedirs(path: Path, exist_ok=False) -> None:
await makedirs(path, exist_ok=exist_ok)
def aopen(
path: Path, mode: str = "r", **kwargs
) -> AiofilesContextManager[None, None, AsyncTextIOWrapper]:
return aiofiles.open(path, mode, **kwargs)
async def aremove(path: Path) -> None:
await remove(path)