mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-06-28 11:12:00 +08:00
优化U115Pan类的文件上传功能,支持多线程并发上传和动态分片计算,提升上传效率和稳定性。
This commit is contained in:
@@ -4,8 +4,11 @@ import json
|
||||
import secrets
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from pathlib import Path
|
||||
from queue import Queue
|
||||
from typing import List, Dict, Optional, Tuple, Union
|
||||
import io
|
||||
|
||||
import oss2
|
||||
import requests
|
||||
@@ -54,6 +57,11 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
# CID和路径缓存
|
||||
_id_cache: Dict[str, str] = {}
|
||||
|
||||
# 最大线程数
|
||||
MAX_WORKERS = 10
|
||||
# 最大分片大小(1GB)
|
||||
MAX_PART_SIZE = 1024 * 1024 * 1024
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.session = requests.Session()
|
||||
@@ -399,16 +407,63 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
modify_time=int(time.time())
|
||||
)
|
||||
|
||||
def _calc_parts(self, file_size: int) -> Tuple[int, int]:
|
||||
"""
|
||||
计算最优分片大小和线程数
|
||||
"""
|
||||
# 根据文件大小计算合适的分片数
|
||||
if file_size <= 100 * 1024 * 1024: # 小于100MB
|
||||
return 10 * 1024 * 1024, min(3, self.MAX_WORKERS) # 10MB分片
|
||||
elif file_size <= 1024 * 1024 * 1024: # 小于1GB
|
||||
return 100 * 1024 * 1024, min(5, self.MAX_WORKERS) # 100MB分片
|
||||
else:
|
||||
# 文件较大,使用较大分片
|
||||
part_size = min(file_size // self.MAX_WORKERS, self.MAX_PART_SIZE)
|
||||
return part_size, min(file_size // part_size + 1, self.MAX_WORKERS)
|
||||
|
||||
def _upload_part(self, bucket: oss2.Bucket, object_name: str, upload_id: str,
|
||||
part_number: int, part_data: bytes, progress_queue: Queue) -> PartInfo:
|
||||
"""
|
||||
上传单个分片
|
||||
"""
|
||||
try:
|
||||
result = bucket.upload_part(object_name, upload_id, part_number, part_data)
|
||||
part_info = PartInfo(part_number, result.etag)
|
||||
# 将上传进度放入队列
|
||||
progress_queue.put(len(part_data))
|
||||
logger.info(f"【115】分片 {part_number} 上传完成")
|
||||
return part_info
|
||||
except Exception as e:
|
||||
logger.error(f"【115】分片 {part_number} 上传失败: {str(e)}")
|
||||
raise
|
||||
|
||||
def _log_progress(self, desc: str, total: int) -> tqdm:
|
||||
"""
|
||||
创建一个可以输出到日志的进度条
|
||||
"""
|
||||
class TqdmToLogger(io.StringIO):
|
||||
def write(s, buf):
|
||||
buf = buf.strip('\r\n\t ')
|
||||
if buf:
|
||||
logger.info(buf)
|
||||
|
||||
return tqdm(
|
||||
total=total,
|
||||
unit='B',
|
||||
unit_scale=True,
|
||||
desc=desc,
|
||||
file=TqdmToLogger(),
|
||||
mininterval=1.0,
|
||||
maxinterval=5.0,
|
||||
miniters=1
|
||||
)
|
||||
|
||||
def upload(self, target_dir: schemas.FileItem, local_path: Path,
|
||||
new_name: Optional[str] = None) -> Optional[schemas.FileItem]:
|
||||
"""
|
||||
实现带秒传、断点续传和二次认证的文件上传
|
||||
实现带秒传、断点续传和多线程并发上传
|
||||
"""
|
||||
|
||||
def encode_callback(cb: dict):
|
||||
"""
|
||||
回调参数Base64编码函数
|
||||
"""
|
||||
return oss2.utils.b64encode_as_string(json.dumps(cb).strip())
|
||||
|
||||
target_name = new_name or local_path.name
|
||||
@@ -440,6 +495,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
if not init_resp.get("state"):
|
||||
logger.warn(f"【115】初始化上传失败: {init_resp.get('error')}")
|
||||
return None
|
||||
|
||||
# 结果
|
||||
init_result = init_resp.get("data")
|
||||
logger.debug(f"【115】上传 Step 1 初始化结果: {init_result}")
|
||||
@@ -457,15 +513,10 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
sign_checks = sign_check.split("-")
|
||||
start = int(sign_checks[0])
|
||||
end = int(sign_checks[1])
|
||||
# 计算指定区间的SHA1
|
||||
# sign_check (用下划线隔开,截取上传文内容的sha1)(单位是byte): "2392148-2392298"
|
||||
with open(local_path, "rb") as f:
|
||||
# 取2392148-2392298之间的内容(包含2392148、2392298)的sha1
|
||||
f.seek(start)
|
||||
chunk = f.read(end - start + 1)
|
||||
sign_val = hashlib.sha1(chunk).hexdigest().upper()
|
||||
# 重新初始化请求
|
||||
# sign_key,sign_val(根据sign_check计算的值大写的sha1值)
|
||||
init_data.update({
|
||||
"pick_code": pick_code,
|
||||
"sign_key": sign_key,
|
||||
@@ -478,7 +529,6 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
)
|
||||
if not init_resp:
|
||||
return None
|
||||
# 二次认证结果
|
||||
init_result = init_resp.get("data")
|
||||
logger.debug(f"【115】上传 Step 2 二次认证结果: {init_result}")
|
||||
if not pick_code:
|
||||
@@ -505,7 +555,7 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
logger.warn("【115】获取上传凭证失败")
|
||||
return None
|
||||
logger.debug(f"【115】上传 Step 4 获取上传凭证结果: {token_resp}")
|
||||
# 上传凭证
|
||||
|
||||
endpoint = token_resp.get("endpoint")
|
||||
AccessKeyId = token_resp.get("AccessKeyId")
|
||||
AccessKeySecret = token_resp.get("AccessKeySecret")
|
||||
@@ -528,69 +578,89 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
if resume_resp.get("callback"):
|
||||
callback = resume_resp["callback"]
|
||||
|
||||
# Step 6: 对象存储上传
|
||||
# Step 6: 多线程分片上传
|
||||
auth = oss2.StsAuth(
|
||||
access_key_id=AccessKeyId,
|
||||
access_key_secret=AccessKeySecret,
|
||||
security_token=SecurityToken
|
||||
)
|
||||
bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa
|
||||
# 处理oss请求回调
|
||||
callback_dict = json.loads(callback.get("callback"))
|
||||
callback_var_dict = json.loads(callback.get("callback_var"))
|
||||
# 补充参数
|
||||
logger.debug(f"【115】上传 Step 6 回调参数:{callback_dict} {callback_var_dict}")
|
||||
# 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。
|
||||
# determine_part_size方法用于确定分片大小,设置分片大小为 100M
|
||||
part_size = determine_part_size(file_size, preferred_size=100 * 1024 * 1024)
|
||||
bucket = oss2.Bucket(auth, endpoint, bucket_name)
|
||||
|
||||
# 计算分片大小和线程数
|
||||
part_size, workers = self._calc_parts(file_size)
|
||||
logger.info(f"【115】开始上传: {local_path} -> {target_path},"
|
||||
f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}")
|
||||
|
||||
# 初始化进度条
|
||||
logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}")
|
||||
progress_bar = tqdm(
|
||||
total=file_size,
|
||||
unit='B',
|
||||
unit_scale=True,
|
||||
desc="上传进度",
|
||||
ascii=True
|
||||
)
|
||||
progress_bar = self._log_progress(f"【115】{target_name} 上传进度", file_size)
|
||||
|
||||
# 初始化分片
|
||||
# 初始化分片上传
|
||||
upload_id = bucket.init_multipart_upload(object_name,
|
||||
params={
|
||||
"encoding-type": "url",
|
||||
"sequential": ""
|
||||
}).upload_id
|
||||
parts = []
|
||||
# 逐个上传分片
|
||||
with open(local_path, 'rb') as fileobj:
|
||||
part_number = 1
|
||||
offset = 0
|
||||
while offset < file_size:
|
||||
num_to_upload = min(part_size, file_size - offset)
|
||||
# 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。
|
||||
logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}")
|
||||
result = bucket.upload_part(object_name, upload_id, part_number,
|
||||
data=SizedFileAdapter(fileobj, num_to_upload))
|
||||
parts.append(PartInfo(part_number, result.etag))
|
||||
logger.info(f"【115】{target_name} 分片 {part_number} 上传完成")
|
||||
offset += num_to_upload
|
||||
part_number += 1
|
||||
# 更新进度
|
||||
progress_bar.update(num_to_upload)
|
||||
params={
|
||||
"encoding-type": "url",
|
||||
"sequential": ""
|
||||
}).upload_id
|
||||
|
||||
# 关闭进度条
|
||||
if progress_bar:
|
||||
progress_bar.close()
|
||||
# 创建进度队列
|
||||
progress_queue = Queue()
|
||||
|
||||
# 创建线程池
|
||||
with ThreadPoolExecutor(max_workers=workers) as pool:
|
||||
futures = []
|
||||
parts = []
|
||||
|
||||
# 提交上传任务
|
||||
with open(local_path, 'rb') as fileobj:
|
||||
part_number = 1
|
||||
offset = 0
|
||||
while offset < file_size:
|
||||
size = min(part_size, file_size - offset)
|
||||
fileobj.seek(offset)
|
||||
part_data = fileobj.read(size)
|
||||
future = pool.submit(
|
||||
self._upload_part,
|
||||
bucket,
|
||||
object_name,
|
||||
upload_id,
|
||||
part_number,
|
||||
part_data,
|
||||
progress_queue
|
||||
)
|
||||
futures.append(future)
|
||||
offset += size
|
||||
part_number += 1
|
||||
|
||||
# 请求头
|
||||
# 更新进度条
|
||||
while len(parts) < len(futures):
|
||||
try:
|
||||
uploaded = progress_queue.get(timeout=1)
|
||||
progress_bar.update(uploaded)
|
||||
except:
|
||||
pass
|
||||
|
||||
# 等待所有任务完成
|
||||
for future in as_completed(futures):
|
||||
try:
|
||||
part_info = future.result()
|
||||
parts.append(part_info)
|
||||
except Exception as e:
|
||||
logger.error(f"【115】分片上传失败: {str(e)}")
|
||||
progress_bar.close()
|
||||
return None
|
||||
|
||||
# 按分片号排序
|
||||
parts.sort(key=lambda x: x.part_number)
|
||||
|
||||
# 完成上传
|
||||
headers = {
|
||||
'X-oss-callback': encode_callback(callback_dict),
|
||||
'x-oss-callback-var': encode_callback(callback_var_dict),
|
||||
'X-oss-callback': encode_callback(callback["callback"]),
|
||||
'x-oss-callback-var': encode_callback(callback["callback_var"]),
|
||||
'x-oss-forbid-overwrite': 'false'
|
||||
}
|
||||
|
||||
try:
|
||||
result = bucket.complete_multipart_upload(object_name, upload_id, parts,
|
||||
headers=headers)
|
||||
headers=headers)
|
||||
if result.status == 200:
|
||||
logger.debug(f"【115】上传 Step 6 回调结果:{result.resp.response.json()}")
|
||||
logger.info(f"【115】{target_name} 上传成功")
|
||||
@@ -603,6 +673,9 @@ class U115Pan(StorageBase, metaclass=Singleton):
|
||||
else:
|
||||
logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}")
|
||||
return None
|
||||
finally:
|
||||
progress_bar.close()
|
||||
|
||||
# 返回结果
|
||||
return self.get_item(target_path)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user