feat(download): 添加快手下载器并优化下载配置功能

- 新增快手下载器,支持快手视频下载
- 添加下载配置页面,可设置各平台Cookies
- 优化后端接口,增加获取和更新Cookies的功能
- 前端新增Downloader组件和相关表单组件
- 更新路由配置,增加下载配置相关路由
This commit is contained in:
黄建武
2025-05-08 18:15:59 +08:00
parent 321d22271a
commit 21c9d47495
21 changed files with 1106 additions and 413 deletions

View File

@@ -1,5 +1,5 @@
from fastapi import FastAPI
from .routers import note, provider,model
from .routers import note, provider, model, config
def create_app() -> FastAPI:
@@ -7,4 +7,5 @@ def create_app() -> FastAPI:
app.include_router(note.router, prefix="/api")
app.include_router(provider.router, prefix="/api")
app.include_router(model.router,prefix="/api")
app.include_router(config.router, prefix="/api")
return app

View File

@@ -13,13 +13,14 @@ from app.downloaders.base import Downloader
from app.downloaders.douyin_helper.abogus import ABogus
from app.enmus.note_enums import DownloadQuality
from app.models.audio_model import AudioDownloadResult
from app.services.cookie_manager import CookieConfigManager
from app.utils.path_helper import get_data_dir
from dotenv import load_dotenv
load_dotenv()
DOUYIN_DOMAIN = "https://www.douyin.com"
cfm=CookieConfigManager()
def get_timestamp(unit: str = "milli"):
"""
根据给定的单位获取当前时间 (Get the current time based on the given unit)
@@ -112,7 +113,7 @@ class DouyinDownloader(Downloader):
def __init__(self, cookie=None):
super().__init__()
self.headers_config = DouyinConfig.HEADERS.copy()
self.headers_config["Cookie"] = os.getenv('DOUYIN_COOKIES')
self.headers_config["Cookie"] = cfm.get('douyin')
print(self.headers_config)
self.proxies_config = DouyinConfig.PROXIES.copy()
self.ttwid_config = DouyinConfig.TTWID.copy()

View File

@@ -0,0 +1,97 @@
import os
import subprocess
from abc import ABC
from typing import Union, Optional
import requests
from app.downloaders.base import Downloader
from app.downloaders.kuaishou_helper.kuaishou import KuaiShou
from app.enmus.note_enums import DownloadQuality
from app.models.audio_model import AudioDownloadResult
from app.utils.path_helper import get_data_dir
class KuaiShouDownloader(Downloader, ABC):
def __init__(self):
super().__init__()
def download(
self,
video_url: str,
output_dir: Union[str, None] = None,
quality: str = "fast",
need_video: Optional[bool] = False
) -> AudioDownloadResult:
if output_dir is None:
output_dir = get_data_dir()
if not output_dir:
output_dir = self.cache_data
os.makedirs(output_dir, exist_ok=True)
ks = KuaiShou()
video_raw_info = ks.run(video_url)
print(video_raw_info)
photo_info = video_raw_info['visionVideoDetail']['photo']
video_id = photo_info['id']
title = photo_info['caption'].strip().replace('\n', '').replace(' ', '_')[:50]
mp4_path = os.path.join(output_dir, f"{video_id}.mp4")
mp3_path = os.path.join(output_dir, f"{video_id}.mp3")
if os.path.exists(mp3_path):
print(f"[已存在] 跳过下载: {mp3_path}")
return AudioDownloadResult(
file_path=mp3_path,
title=title,
duration=photo_info['duration'],
cover_url=photo_info['coverUrl'],
platform="kuaishou",
video_id=video_id,
raw_info={
'tags': ','.join(tag['name'] for tag in video_raw_info.get('tags', []) if tag.get('name'))
},
video_path=mp4_path
)
# 下载 mp4 视频
resp = requests.get(photo_info['photoUrl'], stream=True)
if resp.status_code == 200:
with open(mp4_path, "wb") as f:
for chunk in resp.iter_content(1024 * 1024):
f.write(chunk)
else:
raise Exception(f"视频下载失败: {resp.status_code}")
# 使用 ffmpeg 转换为 mp3
try:
subprocess.run([
"ffmpeg", "-y", "-i", mp4_path, "-vn", "-acodec", "libmp3lame", mp3_path
], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
raise Exception("ffmpeg 转换 MP3 失败")
return AudioDownloadResult(
file_path=mp3_path,
title=photo_info['caption'],
duration=photo_info['duration'],
cover_url=photo_info['coverUrl'],
platform="kuaishou",
video_id=video_id,
raw_info={
'tags': ','.join(tag['name'] for tag in video_raw_info.get('tags', []) if tag.get('name'))
},
video_path=mp4_path
)
def download_video(
self,
video_url: str,
output_dir: Union[str, None] = None,
) -> str:
print('self.download(video_url, output_dir).video_path',self.download(video_url, output_dir).video_path)
return self.download(video_url, output_dir).video_path
if __name__ == '__main__':
ks = KuaiShouDownloader()
ks.download('https://v.kuaishou.com/2vBqX74 王宝强携手刘昊然、岳云鹏上演精彩名场面 全程高能 看一遍笑一遍 "唐探1900 "快成长计划 ...更多')

View File

@@ -0,0 +1,101 @@
import logging
import os
import re
import requests
from dotenv import load_dotenv
from app.services.cookie_manager import CookieConfigManager
from app.utils.logger import get_logger
KUAISHOU_API_BASE = 'https://www.kuaishou.com/graphql'
KUAISHOU_URL = "https://www.kuaishou.com/"
load_dotenv()
headers = {
'Accept-Language': 'zh-CN,zh;q=0.9',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
# 'Cookie': 'did=web_9e8cfa4403000587b9e7d67233e6b04c; didv=1719811812378; kpf=PC_WEB; clientid=3; kpn=KUAISHOU_VISION',
'Origin': 'https://www.kuaishou.com',
'Pragma': 'no-cache',
'Referer': 'https://www.kuaishou.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36',
'accept': '*/*',
'content-type': 'application/json',
'sec-ch-ua': '"Not/A)Brand";v="8", "Chromium";v="126", "Google Chrome";v="126"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
# 'Cookie':cookies.strip()
}
logger = get_logger(__name__)
cfm=CookieConfigManager()
class KuaiShou:
def __init__(self):
self.header = headers.copy()
self.cookie = None
@staticmethod
def _extract_kuaishou_link(text):
url = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', text)
return url[0]
def get_photo_id(self, url):
response = requests.get(url, allow_redirects=True, headers=self.header)
real_url = response.url
# 提取short—video/后面的id
pattern = re.compile(r'short-video/(\w+)')
match = pattern.search(real_url)
return match.group().split('/')[1]
def get_temp_cookies(self):
is_exist = cfm.get('kuaishou')
print(is_exist)
if is_exist:
return is_exist
res = requests.get(url=KUAISHOU_URL, headers=self.header, allow_redirects=True)
cookie_string = '; '.join([f"{k}={v}" for k, v in res.cookies.get_dict().items()])
return cookie_string
def get_video_details(self, url, photo_id):
json_data = {
'operationName': 'visionVideoDetail',
"variables": {"photoId": photo_id, "page": "detail"},
"query": "query visionVideoDetail($photoId: String, $type: String, $page: String, $webPageArea: String) {\n visionVideoDetail(photoId: $photoId, type: $type, page: $page, webPageArea: $webPageArea) {\n status\n type\n author {\n id\n name\n following\n headerUrl\n __typename\n }\n photo {\n id\n duration\n caption\n likeCount\n realLikeCount\n coverUrl\n photoUrl\n liked\n timestamp\n expTag\n llsid\n viewCount\n videoRatio\n stereoType\n croppedPhotoUrl\n manifest {\n mediaType\n businessType\n version\n adaptationSet {\n id\n duration\n representation {\n id\n defaultSelect\n backupUrl\n codecs\n url\n height\n width\n avgBitrate\n maxBitrate\n m3u8Slice\n qualityType\n qualityLabel\n frameRate\n featureP2sp\n hidden\n disableAdaptive\n __typename\n }\n __typename\n }\n __typename\n }\n __typename\n }\n tags {\n type\n name\n __typename\n }\n commentLimit {\n canAddComment\n __typename\n }\n llsid\n danmakuSwitch\n __typename\n }\n}\n"
}
response = requests.post(url=KUAISHOU_API_BASE, headers=self.header, json=json_data)
if response.status_code == 200:
response.raise_for_status()
return response.json()
else:
return None
def run(self, url):
real_url = self._extract_kuaishou_link(url)
if not real_url:
logger.error(f"快手视频 URL 解析失败 {url}")
cookies = self.get_temp_cookies()
if not cookies:
logger.error(f"快手视频 cookies 解析失败 {url},请考虑设置环境变量 KUAISHOU_COOKIES")
self.header['Cookie'] = cookies.strip()
photo_id = self.get_photo_id(real_url)
if photo_id is None:
logger.error(f"快手视频 ID 解析失败 {url}")
video_details = self.get_video_details(real_url, photo_id)
print(video_details)
if video_details is None:
logger.error(f"快手视频详情解析失败 {url}")
return video_details['data']
if __name__ == '__main__':
ks = KuaiShou()
ks.run(
'https://v.kuaishou.com/2vBqX74 王宝强携手刘昊然、岳云鹏上演精彩名场面 全程高能 看一遍笑一遍 "唐探1900 "快成长计划 ...更多')

View File

@@ -0,0 +1,30 @@
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Optional
from app.utils.response import ResponseWrapper as R
from app.services.cookie_manager import CookieConfigManager
router = APIRouter()
cookie_manager = CookieConfigManager()
class CookieUpdateRequest(BaseModel):
platform: str
cookie: str
@router.get("/get_downloader_cookie/{platform}")
def get_cookie(platform: str):
cookie = cookie_manager.get(platform)
if not cookie:
return R.success(msg='未找到Cookies')
return R.success(
data={"platform": platform, "cookie": cookie}
)
@router.post("/update_downloader_cookie")
def update_cookie(data: CookieUpdateRequest):
cookie_manager.set(data.platform, data.cookie)
return {"message": "Cookie updated successfully"}

View File

@@ -1,5 +1,6 @@
from app.downloaders.bilibili_downloader import BilibiliDownloader
from app.downloaders.douyin_downloader import DouyinDownloader
from app.downloaders.kuaishou_downloader import KuaiShouDownloader
from app.downloaders.local_downloader import LocalDownloader
from app.downloaders.youtube_downloader import YoutubeDownloader
@@ -7,6 +8,7 @@ SUPPORT_PLATFORM_MAP = {
'youtube':YoutubeDownloader(),
'bilibili':BilibiliDownloader(),
'tiktok':DouyinDownloader(),
'kuaishou':KuaiShouDownloader(),
'douyin':DouyinDownloader(),
'local':LocalDownloader()
}

View File

@@ -0,0 +1,44 @@
import json
from pathlib import Path
from typing import Optional, Dict
class CookieConfigManager:
def __init__(self, filepath: str = "config/downloader.json"):
self.path = Path(filepath)
self.path.parent.mkdir(parents=True, exist_ok=True)
if not self.path.exists():
self._write({})
def _read(self) -> Dict[str, Dict[str, str]]:
try:
with self.path.open("r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
def _write(self, data: Dict[str, Dict[str, str]]):
with self.path.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def get(self, platform: str) -> Optional[str]:
data = self._read()
return data.get(platform, {}).get("cookie")
def set(self, platform: str, cookie: str):
data = self._read()
data[platform] = {"cookie": cookie}
self._write(data)
def delete(self, platform: str):
data = self._read()
if platform in data:
del data[platform]
self._write(data)
def list_all(self) -> Dict[str, str]:
data = self._read()
return {k: v.get("cookie", "") for k, v in data.items()}
def exists(self, platform: str) -> bool:
return self.get(platform) is not None

View File

@@ -1,24 +1,30 @@
from pydantic import AnyUrl, validator, BaseModel
from pydantic import AnyUrl, validator, BaseModel, field_validator
import re
SUPPORTED_PLATFORMS = {
"bilibili": r"(https?://)?(www\.)?bilibili\.com/video/[a-zA-Z0-9]+",
"youtube": r"(https?://)?(www\.)?(youtube\.com/watch\?v=|youtu\.be/)[\w\-]+",
"douyin": r"'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F])"
"douyin": "douyin",
"kuaishou": "kuaishou"
}
def is_supported_video_url(url: str) -> bool:
return any(re.match(pattern, url) for pattern in SUPPORTED_PLATFORMS.values())
for name, pattern in SUPPORTED_PLATFORMS.items():
if pattern in ["douyin", "kuaishou"]:
if pattern in url:
return True
else:
if re.match(pattern, url):
return True
return False
class VideoRequest(BaseModel):
url: AnyUrl
platform: str
@validator("url")
@field_validator("url")
def validate_video_url(cls, v):
if not is_supported_video_url(str(v)):
raise ValueError("暂不支持该视频平台或链接格式无效")

Binary file not shown.