From 676dacce414fd019ef7bedad64ea19151eff4650 Mon Sep 17 00:00:00 2001 From: shiyu Date: Sun, 3 May 2026 07:51:55 +0800 Subject: [PATCH] feat: update PikPak adapter configuration and enhance token handling --- domain/adapters/providers/pikpak.py | 77 ++++++++++++++++++++--------- 1 file changed, 55 insertions(+), 22 deletions(-) diff --git a/domain/adapters/providers/pikpak.py b/domain/adapters/providers/pikpak.py index 4f70074..db95d71 100644 --- a/domain/adapters/providers/pikpak.py +++ b/domain/adapters/providers/pikpak.py @@ -13,8 +13,9 @@ from models import StorageAdapter from .base import BaseAdapter -API_BASE = "https://api-drive.mypikpak.net/drive/v1" -USER_BASE = "https://user.mypikpak.net/v1" +API_BASE = "https://api-drive.mypikpak.com/drive/v1" +USER_BASE = "https://user.mypikpak.com/v1" +TOKEN_REFRESH_BUFFER = 300 ANDROID_ALGORITHMS = [ "SOP04dGzk0TNO7t7t9ekDbAmx+eq0OI1ovEx", @@ -62,11 +63,11 @@ PLATFORM_CONFIG = { "android": { "client_id": "YNxT9w7GMdWvEOKa", "client_secret": "dbw2OtmVEeuUvIptb1Coyg", - "client_version": "1.53.2", + "client_version": "1.21.0", "package_name": "com.pikcloud.pikpak", "sdk_version": "2.0.6.206003", "algorithms": ANDROID_ALGORITHMS, - "ua": None, + "ua": "ANDROID-com.pikcloud.pikpak/1.21.0", }, "web": { "client_id": "YUMx5nI8ZU8Ap8pm", @@ -109,6 +110,13 @@ def _as_bool(value: Any, default: bool = False) -> bool: return bool(value) +def _as_int(value: Any, default: int = 0) -> int: + try: + return int(value or default) + except Exception: + return default + + def _root_payload(root: str | None) -> Tuple[str, str]: raw = (root or "").strip() if not raw: @@ -158,9 +166,9 @@ class PikPakAdapter: if not self.username or not self.password: raise ValueError("PikPak adapter requires username and password") - self.platform = str(cfg.get("platform") or "web").strip().lower() + self.platform = str(cfg.get("platform") or "android").strip().lower() if self.platform not in PLATFORM_CONFIG: - self.platform = "web" + self.platform = "android" platform_cfg = PLATFORM_CONFIG[self.platform] self.client_id = str(platform_cfg["client_id"]) @@ -170,10 +178,14 @@ class PikPakAdapter: self.sdk_version = str(platform_cfg["sdk_version"]) self.algorithms = list(platform_cfg["algorithms"]) - self.device_id = str(cfg.get("device_id") or "").strip() or _md5_text(self.username + self.password) + device_id = str(cfg.get("device_id") or "").strip() + if not device_id or device_id == _md5_text(self.username + self.password): + device_id = _md5_text(self.username) + self.device_id = device_id self.user_id = str(cfg.get("user_id") or "").strip() self.refresh_token = str(cfg.get("refresh_token") or "").strip() self.access_token = str(cfg.get("access_token") or "").strip() + self.expires_at = _as_int(cfg.get("expires_at"), 0) self.captcha_token = str(cfg.get("captcha_token") or "").strip() self.root_id = str(cfg.get("root_id") or "").strip() self.disable_media_link = _as_bool(cfg.get("disable_media_link"), True) @@ -232,6 +244,18 @@ class PikPakAdapter: path = m.group(1) if m else "/" return f"{method.upper()}:{path}" + @staticmethod + def _full_action(method: str, url: str) -> str: + return f"{method.upper()}:{url}" + + def _captcha_action(self, method: str, url: str, *, auth: bool) -> str: + if not auth and url == f"{USER_BASE}/auth/signin": + return self._full_action(method, url) + return self._action(method, url) + + def _has_valid_access_token(self) -> bool: + return bool(self.access_token and self.expires_at > int(time.time()) + TOKEN_REFRESH_BUFFER) + def _download_headers(self) -> Dict[str, str]: headers = { "User-Agent": self.user_agent, @@ -247,6 +271,8 @@ class PikPakAdapter: changed = False for key, value in ( ("refresh_token", self.refresh_token), + ("access_token", self.access_token), + ("expires_at", self.expires_at), ("captcha_token", self.captcha_token), ("device_id", self.device_id), ("user_id", self.user_id), @@ -261,10 +287,10 @@ class PikPakAdapter: await self.record.save(update_fields=["config"]) async def _ensure_auth(self): - if self.access_token: + if self._has_valid_access_token(): return async with self._auth_lock: - if self.access_token: + if self._has_valid_access_token(): return if self.refresh_token: try: @@ -281,22 +307,24 @@ class PikPakAdapter: async def _login(self): url = f"{USER_BASE}/auth/signin" if not self.captcha_token: - await self._refresh_captcha_token(self._action("POST", url), self._login_captcha_meta()) + await self._refresh_captcha_token(self._full_action("POST", url), self._login_captcha_meta()) body = { "captcha_token": self.captcha_token, "client_id": self.client_id, "client_secret": self.client_secret, + "grant_type": "password", "username": self.username, "password": self.password, } - data = await self._raw_json("POST", url, json=body, params={"client_id": self.client_id}, auth=False) + data = await self._raw_json("POST", url, json=body, auth=False) self.refresh_token = str(data.get("refresh_token") or "").strip() self.access_token = str(data.get("access_token") or "").strip() + self.expires_at = int(time.time()) + _as_int(data.get("expires_in"), 0) self.user_id = str(data.get("sub") or self.user_id).strip() if not self.refresh_token or not self.access_token: raise HTTPException(502, detail="PikPak login failed: missing token") - if self.platform == "android": + if self.platform == "android" and not PLATFORM_CONFIG[self.platform].get("ua"): self.user_agent = self._build_android_user_agent() await self._save_runtime_config() @@ -308,21 +336,18 @@ class PikPakAdapter: "grant_type": "refresh_token", "refresh_token": self.refresh_token, } - data = await self._raw_json("POST", url, json=body, params={"client_id": self.client_id}, auth=False) + data = await self._raw_json("POST", url, json=body, auth=False) self.refresh_token = str(data.get("refresh_token") or "").strip() self.access_token = str(data.get("access_token") or "").strip() + self.expires_at = int(time.time()) + _as_int(data.get("expires_in"), 0) self.user_id = str(data.get("sub") or self.user_id).strip() if not self.refresh_token or not self.access_token: raise HTTPException(502, detail="PikPak refresh token failed: missing token") - if self.platform == "android": + if self.platform == "android" and not PLATFORM_CONFIG[self.platform].get("ua"): self.user_agent = self._build_android_user_agent() await self._save_runtime_config() def _login_captcha_meta(self) -> Dict[str, str]: - if re.match(r"\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*", self.username): - return {"email": self.username} - if 11 <= len(self.username) <= 18: - return {"phone_number": self.username} return {"username": self.username} async def _refresh_captcha_token(self, action: str, meta: Dict[str, str]): @@ -335,7 +360,7 @@ class PikPakAdapter: "meta": meta, "redirect_uri": "xlaccsdk01://xbase.cloud/callback?state=harbor", } - data = await self._raw_json("POST", url, json=body, params={"client_id": self.client_id}, auth=False) + data = await self._raw_json("POST", url, json=body, auth=False) verify_url = str(data.get("url") or "").strip() token = str(data.get("captcha_token") or "").strip() if token and not verify_url: @@ -440,9 +465,15 @@ class PikPakAdapter: if self.user_id: await self._refresh_captcha_token_after_login(method, url) else: - await self._refresh_captcha_token(self._action(method, url), self._login_captcha_meta()) + await self._refresh_captcha_token( + self._captcha_action(method, url, auth=auth), + self._login_captcha_meta(), + ) else: - await self._refresh_captcha_token(self._action(method, url), self._login_captcha_meta()) + await self._refresh_captcha_token( + self._captcha_action(method, url, auth=auth), + self._login_captcha_meta(), + ) return await self._raw_json( method, url, @@ -879,8 +910,10 @@ ADAPTER_TYPE = "pikpak" CONFIG_SCHEMA = [ {"key": "username", "label": "PikPak 账号", "type": "string", "required": True}, {"key": "password", "label": "PikPak 密码", "type": "password", "required": True}, - {"key": "platform", "label": "平台", "type": "select", "required": False, "default": "web", "options": ["web", "android", "pc"]}, + {"key": "platform", "label": "平台", "type": "select", "required": False, "default": "android", "options": ["android", "web", "pc"]}, {"key": "refresh_token", "label": "Refresh Token", "type": "password", "required": False}, + {"key": "access_token", "label": "Access Token", "type": "password", "required": False}, + {"key": "expires_at", "label": "Access Token 过期时间戳", "type": "number", "required": False}, {"key": "captcha_token", "label": "Captcha Token", "type": "password", "required": False}, {"key": "device_id", "label": "Device ID", "type": "string", "required": False}, {"key": "root_id", "label": "根目录 ID", "type": "string", "required": False, "default": ""},