From 5eca5a601190035d08b508770c944d60c1fac173 Mon Sep 17 00:00:00 2001 From: jxxghp Date: Thu, 8 May 2025 09:47:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96U115Pan=E7=B1=BB=E7=9A=84?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E4=B8=8A=E4=BC=A0=E5=8A=9F=E8=83=BD=EF=BC=8C?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=A4=9A=E7=BA=BF=E7=A8=8B=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E5=92=8C=E5=8A=A8=E6=80=81=E5=88=86=E7=89=87?= =?UTF-8?q?=E8=AE=A1=E7=AE=97=EF=BC=8C=E6=8F=90=E5=8D=87=E4=B8=8A=E4=BC=A0?= =?UTF-8?q?=E6=95=88=E7=8E=87=E5=92=8C=E7=A8=B3=E5=AE=9A=E6=80=A7=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/modules/filemanager/storages/u115.py | 191 ++++++++++++++++------- 1 file changed, 132 insertions(+), 59 deletions(-) diff --git a/app/modules/filemanager/storages/u115.py b/app/modules/filemanager/storages/u115.py index 16bbc204..396024e1 100644 --- a/app/modules/filemanager/storages/u115.py +++ b/app/modules/filemanager/storages/u115.py @@ -4,8 +4,11 @@ import json import secrets import threading import time +from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path +from queue import Queue from typing import List, Dict, Optional, Tuple, Union +import io import oss2 import requests @@ -54,6 +57,11 @@ class U115Pan(StorageBase, metaclass=Singleton): # CID和路径缓存 _id_cache: Dict[str, str] = {} + # 最大线程数 + MAX_WORKERS = 10 + # 最大分片大小(1GB) + MAX_PART_SIZE = 1024 * 1024 * 1024 + def __init__(self): super().__init__() self.session = requests.Session() @@ -399,16 +407,63 @@ class U115Pan(StorageBase, metaclass=Singleton): modify_time=int(time.time()) ) + def _calc_parts(self, file_size: int) -> Tuple[int, int]: + """ + 计算最优分片大小和线程数 + """ + # 根据文件大小计算合适的分片数 + if file_size <= 100 * 1024 * 1024: # 小于100MB + return 10 * 1024 * 1024, min(3, self.MAX_WORKERS) # 10MB分片 + elif file_size <= 1024 * 1024 * 1024: # 小于1GB + return 100 * 1024 * 1024, min(5, self.MAX_WORKERS) # 100MB分片 + else: + # 文件较大,使用较大分片 + part_size = min(file_size // self.MAX_WORKERS, self.MAX_PART_SIZE) + return part_size, min(file_size // part_size + 1, self.MAX_WORKERS) + + def _upload_part(self, bucket: oss2.Bucket, object_name: str, upload_id: str, + part_number: int, part_data: bytes, progress_queue: Queue) -> PartInfo: + """ + 上传单个分片 + """ + try: + result = bucket.upload_part(object_name, upload_id, part_number, part_data) + part_info = PartInfo(part_number, result.etag) + # 将上传进度放入队列 + progress_queue.put(len(part_data)) + logger.info(f"【115】分片 {part_number} 上传完成") + return part_info + except Exception as e: + logger.error(f"【115】分片 {part_number} 上传失败: {str(e)}") + raise + + def _log_progress(self, desc: str, total: int) -> tqdm: + """ + 创建一个可以输出到日志的进度条 + """ + class TqdmToLogger(io.StringIO): + def write(s, buf): + buf = buf.strip('\r\n\t ') + if buf: + logger.info(buf) + + return tqdm( + total=total, + unit='B', + unit_scale=True, + desc=desc, + file=TqdmToLogger(), + mininterval=1.0, + maxinterval=5.0, + miniters=1 + ) + def upload(self, target_dir: schemas.FileItem, local_path: Path, new_name: Optional[str] = None) -> Optional[schemas.FileItem]: """ - 实现带秒传、断点续传和二次认证的文件上传 + 实现带秒传、断点续传和多线程并发上传 """ - def encode_callback(cb: dict): - """ - 回调参数Base64编码函数 - """ return oss2.utils.b64encode_as_string(json.dumps(cb).strip()) target_name = new_name or local_path.name @@ -440,6 +495,7 @@ class U115Pan(StorageBase, metaclass=Singleton): if not init_resp.get("state"): logger.warn(f"【115】初始化上传失败: {init_resp.get('error')}") return None + # 结果 init_result = init_resp.get("data") logger.debug(f"【115】上传 Step 1 初始化结果: {init_result}") @@ -457,15 +513,10 @@ class U115Pan(StorageBase, metaclass=Singleton): sign_checks = sign_check.split("-") start = int(sign_checks[0]) end = int(sign_checks[1]) - # 计算指定区间的SHA1 - # sign_check (用下划线隔开,截取上传文内容的sha1)(单位是byte): "2392148-2392298" with open(local_path, "rb") as f: - # 取2392148-2392298之间的内容(包含2392148、2392298)的sha1 f.seek(start) chunk = f.read(end - start + 1) sign_val = hashlib.sha1(chunk).hexdigest().upper() - # 重新初始化请求 - # sign_key,sign_val(根据sign_check计算的值大写的sha1值) init_data.update({ "pick_code": pick_code, "sign_key": sign_key, @@ -478,7 +529,6 @@ class U115Pan(StorageBase, metaclass=Singleton): ) if not init_resp: return None - # 二次认证结果 init_result = init_resp.get("data") logger.debug(f"【115】上传 Step 2 二次认证结果: {init_result}") if not pick_code: @@ -505,7 +555,7 @@ class U115Pan(StorageBase, metaclass=Singleton): logger.warn("【115】获取上传凭证失败") return None logger.debug(f"【115】上传 Step 4 获取上传凭证结果: {token_resp}") - # 上传凭证 + endpoint = token_resp.get("endpoint") AccessKeyId = token_resp.get("AccessKeyId") AccessKeySecret = token_resp.get("AccessKeySecret") @@ -528,69 +578,89 @@ class U115Pan(StorageBase, metaclass=Singleton): if resume_resp.get("callback"): callback = resume_resp["callback"] - # Step 6: 对象存储上传 + # Step 6: 多线程分片上传 auth = oss2.StsAuth( access_key_id=AccessKeyId, access_key_secret=AccessKeySecret, security_token=SecurityToken ) - bucket = oss2.Bucket(auth, endpoint, bucket_name) # noqa - # 处理oss请求回调 - callback_dict = json.loads(callback.get("callback")) - callback_var_dict = json.loads(callback.get("callback_var")) - # 补充参数 - logger.debug(f"【115】上传 Step 6 回调参数:{callback_dict} {callback_var_dict}") - # 填写不能包含Bucket名称在内的Object完整路径,例如exampledir/exampleobject.txt。 - # determine_part_size方法用于确定分片大小,设置分片大小为 100M - part_size = determine_part_size(file_size, preferred_size=100 * 1024 * 1024) + bucket = oss2.Bucket(auth, endpoint, bucket_name) + + # 计算分片大小和线程数 + part_size, workers = self._calc_parts(file_size) + logger.info(f"【115】开始上传: {local_path} -> {target_path}," + f"分片大小:{StringUtils.str_filesize(part_size)},线程数:{workers}") # 初始化进度条 - logger.info(f"【115】开始上传: {local_path} -> {target_path},分片大小:{StringUtils.str_filesize(part_size)}") - progress_bar = tqdm( - total=file_size, - unit='B', - unit_scale=True, - desc="上传进度", - ascii=True - ) + progress_bar = self._log_progress(f"【115】{target_name} 上传进度", file_size) - # 初始化分片 + # 初始化分片上传 upload_id = bucket.init_multipart_upload(object_name, - params={ - "encoding-type": "url", - "sequential": "" - }).upload_id - parts = [] - # 逐个上传分片 - with open(local_path, 'rb') as fileobj: - part_number = 1 - offset = 0 - while offset < file_size: - num_to_upload = min(part_size, file_size - offset) - # 调用SizedFileAdapter(fileobj, size)方法会生成一个新的文件对象,重新计算起始追加位置。 - logger.info(f"【115】开始上传 {target_name} 分片 {part_number}: {offset} -> {offset + num_to_upload}") - result = bucket.upload_part(object_name, upload_id, part_number, - data=SizedFileAdapter(fileobj, num_to_upload)) - parts.append(PartInfo(part_number, result.etag)) - logger.info(f"【115】{target_name} 分片 {part_number} 上传完成") - offset += num_to_upload - part_number += 1 - # 更新进度 - progress_bar.update(num_to_upload) + params={ + "encoding-type": "url", + "sequential": "" + }).upload_id - # 关闭进度条 - if progress_bar: - progress_bar.close() + # 创建进度队列 + progress_queue = Queue() + + # 创建线程池 + with ThreadPoolExecutor(max_workers=workers) as pool: + futures = [] + parts = [] + + # 提交上传任务 + with open(local_path, 'rb') as fileobj: + part_number = 1 + offset = 0 + while offset < file_size: + size = min(part_size, file_size - offset) + fileobj.seek(offset) + part_data = fileobj.read(size) + future = pool.submit( + self._upload_part, + bucket, + object_name, + upload_id, + part_number, + part_data, + progress_queue + ) + futures.append(future) + offset += size + part_number += 1 - # 请求头 + # 更新进度条 + while len(parts) < len(futures): + try: + uploaded = progress_queue.get(timeout=1) + progress_bar.update(uploaded) + except: + pass + + # 等待所有任务完成 + for future in as_completed(futures): + try: + part_info = future.result() + parts.append(part_info) + except Exception as e: + logger.error(f"【115】分片上传失败: {str(e)}") + progress_bar.close() + return None + + # 按分片号排序 + parts.sort(key=lambda x: x.part_number) + + # 完成上传 headers = { - 'X-oss-callback': encode_callback(callback_dict), - 'x-oss-callback-var': encode_callback(callback_var_dict), + 'X-oss-callback': encode_callback(callback["callback"]), + 'x-oss-callback-var': encode_callback(callback["callback_var"]), 'x-oss-forbid-overwrite': 'false' } + try: result = bucket.complete_multipart_upload(object_name, upload_id, parts, - headers=headers) + headers=headers) if result.status == 200: logger.debug(f"【115】上传 Step 6 回调结果:{result.resp.response.json()}") logger.info(f"【115】{target_name} 上传成功") @@ -603,6 +673,9 @@ class U115Pan(StorageBase, metaclass=Singleton): else: logger.error(f"【115】{target_name} 上传失败: {e.status}, 错误码: {e.code}, 详情: {e.message}") return None + finally: + progress_bar.close() + # 返回结果 return self.get_item(target_path)