feat(downloaders): 添加抖音视频识别功能

- 新增 abogus.py 文件,实现 a_bogus 参数的生成逻辑
- 代码源自 JoeanAmier/TikTokDownloader 项目,并进行了适配和优化
- 功能包括生成用户代理字符串、加密 URL 参数和生成最终的 a_bogus值
- 提供了详细的注释和函数说明,便于理解和维护
This commit is contained in:
黄建武
2025-05-02 14:00:29 +08:00
parent 7066b4288a
commit 04dad3b72a
7 changed files with 957 additions and 123 deletions

View File

@@ -54,7 +54,7 @@ const formSchema = z
.superRefine((data, ctx) => {
const { video_url, platform } = data
if (platform === 'local') {
if (platform === 'local' || platform === 'douyin') {
if (!video_url || typeof video_url !== 'string') {
ctx.addIssue({
code: z.ZodIssueCode.custom,
@@ -249,7 +249,7 @@ const NoteForm = () => {
<SelectContent>
<SelectItem value="bilibili"></SelectItem>
<SelectItem value="youtube">Youtube</SelectItem>
{/*<SelectItem value="douyin">抖音</SelectItem>*/}
<SelectItem value="douyin"></SelectItem>
<SelectItem value="local"></SelectItem>
</SelectContent>
</Select>
@@ -335,45 +335,45 @@ const NoteForm = () => {
{/* 支持哔哩哔哩视频链接,例如:*/}
{/* https://www.bilibili.com/video/BV1vc25YQE9X/*/}
{/*</p>*/}
<FormField
control={form.control}
name="quality"
render={({ field }) => (
<FormItem>
<div className="my-3 flex items-center justify-between">
<h2 className="block"></h2>
<TooltipProvider>
<Tooltip>
<TooltipTrigger asChild>
<Info className="hover:text-primary h-4 w-4 cursor-pointer text-neutral-400" />
</TooltipTrigger>
<TooltipContent>
<p className="max-w-[200px] text-xs">
</p>
</TooltipContent>
</Tooltip>
</TooltipProvider>
</div>
<Select onValueChange={field.onChange} defaultValue={field.value}>
<FormControl>
<SelectTrigger className="w-full">
<SelectValue placeholder="选择质量" />
</SelectTrigger>
</FormControl>
<SelectContent>
<SelectItem value="fast"></SelectItem>
<SelectItem value="medium"></SelectItem>
<SelectItem value="slow"></SelectItem>
</SelectContent>
</Select>
{/*<FormDescription className="text-xs text-neutral-500">*/}
{/* 质量越高,下载体积越大,速度越慢*/}
{/*</FormDescription>*/}
<FormMessage />
</FormItem>
)}
/>
{/*<FormField*/}
{/* control={form.control}*/}
{/* name="quality"*/}
{/* render={({ field }) => (*/}
{/* <FormItem>*/}
{/* <div className="my-3 flex items-center justify-between">*/}
{/* <h2 className="block">音频质量</h2>*/}
{/* <TooltipProvider>*/}
{/* <Tooltip>*/}
{/* <TooltipTrigger asChild>*/}
{/* <Info className="hover:text-primary h-4 w-4 cursor-pointer text-neutral-400" />*/}
{/* </TooltipTrigger>*/}
{/* <TooltipContent>*/}
{/* <p className="max-w-[200px] text-xs">*/}
{/* 质量越高,下载体积越大,速度越慢*/}
{/* </p>*/}
{/* </TooltipContent>*/}
{/* </Tooltip>*/}
{/* </TooltipProvider>*/}
{/* </div>*/}
{/* <Select onValueChange={field.onChange} defaultValue={field.value}>*/}
{/* <FormControl>*/}
{/* <SelectTrigger className="w-full">*/}
{/* <SelectValue placeholder="选择质量" />*/}
{/* </SelectTrigger>*/}
{/* </FormControl>*/}
{/* <SelectContent>*/}
{/* <SelectItem value="fast">快速(压缩)</SelectItem>*/}
{/* <SelectItem value="medium">中等(推荐)</SelectItem>*/}
{/* <SelectItem value="slow">高质量(清晰)</SelectItem>*/}
{/* </SelectContent>*/}
{/* </Select>*/}
{/* /!*<FormDescription className="text-xs text-neutral-500">*!/*/}
{/* /!* 质量越高,下载体积越大,速度越慢*!/*/}
{/* /!*</FormDescription>*!/*/}
{/* <FormMessage />*/}
{/* </FormItem>*/}
{/* )}*/}
{/*/>*/}
<FormField
control={form.control}

View File

@@ -1,90 +1,286 @@
import datetime
import json
import os
from abc import ABC
import re
from typing import Union, Optional
from urllib.parse import quote, urlencode
import yt_dlp
import httpx
import requests
from pydantic import BaseModel
from app.downloaders.base import Downloader, DownloadQuality
from app.models.notes_model import AudioDownloadResult
from app.downloaders.base import Downloader
from app.downloaders.douyin_helper.abogus import ABogus
from app.enmus.note_enums import DownloadQuality
from app.models.audio_model import AudioDownloadResult
from app.utils.path_helper import get_data_dir
from dotenv import load_dotenv
load_dotenv()
DOUYIN_DOMAIN = "https://www.douyin.com"
class DouyinDownloader(Downloader, ABC):
def get_timestamp(unit: str = "milli"):
"""
根据给定的单位获取当前时间 (Get the current time based on the given unit)
Args:
unit (str): 时间单位,可以是 "milli""sec""min"
(The time unit, which can be "milli", "sec", "min", etc.)
Returns:
int: 根据给定单位的当前时间 (The current time based on the given unit)
"""
now = datetime.datetime.utcnow() - datetime.datetime(1970, 1, 1)
if unit == "milli":
return int(now.total_seconds() * 1000)
elif unit == "sec":
return int(now.total_seconds())
elif unit == "min":
return int(now.total_seconds() / 60)
else:
raise ValueError("Unsupported time unit")
class DouyinConfig:
HEADERS = {
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36",
"Referer": "https://www.douyin.com/",
"Cookie": None
}
PROXIES = {
"http": None,
"https": None,
}
MS_TOKEN = {
"url": "https://mssdk.bytedance.com/web/report",
"magic": 538969122,
"version": 1,
"dataType": 8,
"strData": "fWOdJTQR3/jwmZqBBsPO6tdNEc1jX7YTwPg0Z8CT+j3HScLFbj2Zm1XQ7/lqgSutntVKLJWaY3Hc/+vc0h+So9N1t6EqiImu5jKyUa+S4NPy6cNP0x9CUQQgb4+RRihCgsn4QyV8jivEFOsj3N5zFQbzXRyOV+9aG5B5EAnwpn8C70llsWq0zJz1VjN6y2KZiBZRyonAHE8feSGpwMDeUTllvq6BG3AQZz7RrORLWNCLEoGzM6bMovYVPRAJipuUML4Hq/568bNb5vqAo0eOFpvTZjQFgbB7f/CtAYYmnOYlvfrHKBKvb0TX6AjYrw2qmNNEer2ADJosmT5kZeBsogDui8rNiI/OOdX9PVotmcSmHOLRfw1cYXTgwHXr6cJeJveuipgwtUj2FNT4YCdZfUGGyRDz5bR5bdBuYiSRteSX12EktobsKPksdhUPGGv99SI1QRVmR0ETdWqnKWOj/7ujFZsNnfCLxNfqxQYEZEp9/U01CHhWLVrdzlrJ1v+KJH9EA4P1Wo5/2fuBFVdIz2upFqEQ11DJu8LSyD43qpTok+hFG3Moqrr81uPYiyPHnUvTFgwA/TIE11mTc/pNvYIb8IdbE4UAlsR90eYvPkI+rK9KpYN/l0s9ti9sqTth12VAw8tzCQvhKtxevJRQntU3STeZ3coz9Dg8qkvaSNFWuBDuyefZBGVSgILFdMy33//l/eTXhQpFrVc9OyxDNsG6cvdFwu7trkAENHU5eQEWkFSXBx9Ml54+fa3LvJBoacfPViyvzkJworlHcYYTG392L4q6wuMSSpYUconb+0c5mwqnnLP6MvRdm/bBTaY2Q6RfJcCxyLW0xsJMO6fgLUEjAg/dcqGxl6gDjUVRWbCcG1NAwPCfmYARTuXQYbFc8LO+r6WQTWikO9Q7Cgda78pwH07F8bgJ8zFBbWmyrghilNXENNQkyIzBqOQ1V3w0WXF9+Z3vG3aBKCjIENqAQM9qnC14WMrQkfCHosGbQyEH0n/5R2AaVTE/ye2oPQBWG1m0Gfcgs/96f6yYrsxbDcSnMvsA+okyd6GfWsdZYTIK1E97PYHlncFeOjxySjPpfy6wJc4UlArJEBZYmgveo1SZAhmXl3pJY3yJa9CmYImWkhbpwsVkSmG3g11JitJXTGLIfqKXSAhh+7jg4HTKe+5KNir8xmbBI/DF8O/+diFAlD+BQd3cV0G4mEtCiPEhOvVLKV1pE+fv7nKJh0t38wNVdbs3qHtiQNN7JhY4uWZAosMuBXSjpEtoNUndI+o0cjR8XJ8tSFnrAY8XihiRzLMfeisiZxWCvVwIP3kum9MSHXma75cdCQGFBfFRj0jPn1JildrTh2vRgwG+KeDZ33BJ2VGw9PgRkztZ2l/W5d32jc7H91FftFFhwXil6sA23mr6nNp6CcrO7rOblcm5SzXJ5MA601+WVicC/g3p6A0lAnhjsm37qP+xGT+cbCFOfjexDYEhnqz0QZm94CCSnilQ9B/HBLhWOddp9GK0SABIk5i3xAH701Xb4HCcgAulvfO5EK0RL2eN4fb+CccgZQeO1Zzo4qsMHc13UG0saMgBEH8SqYlHz2S0CVHuDY5j1MSV0nsShjM01vIynw6K0T8kmEyNjt1eRGlleJ5lvE8vonJv7rAeaVRZ06rlYaxrMT6cK3RSHd2liE50Z3ik3xezwWoaY6zBXvCzljyEmqjNFgAPU3gI+N1vi0MsFmwAwFzYqqWdk3jwRoWLp//FnawQX0g5T64CnfAe/o2e/8o5/bvz83OsAAwZoR48GZzPu7KCIN9q4GBjyrePNx5Csq2srblifmzSKwF5MP/RLYsk6mEE15jpCMKOVlHcu0zhJybNP3AKMVllF6pvn+HWvUnLXNkt0A6zsfvjAva/tbLQiiiYi6vtheasIyDz3HpODlI+BCkV6V8lkTt7m8QJ1IcgTfqjQBummyjYTSwsQji3DdNCnlKYd13ZQa545utqu837FFAzOZQhbnC3bKqeJqO2sE3m7WBUMbRWLflPRqp/PsklN+9jBPADKxKPl8g6/NZVq8fB1w68D5EJlGExdDhglo4B0aihHhb1u3+zJ2DqkxkPCGBAZ2AcuFIDzD53yS4NssoWb4HJ7YyzPaJro+tgG9TshWRBtUw8Or3m0OtQtX+rboYn3+GxvD1O8vWInrg5qxnepelRcQzmnor4rHF6ZNhAJZAf18Rjncra00HPJBugY5rD+EwnN9+mGQo43b01qBBRYEnxy9JJYuvXxNXxe47/MEPOw6qsxN+dmyIWZSuzkw8K+iBM/anE11yfU4qTFt0veCaVprK6tXaFK0ZhGXDOYJd70sjIP4UrPhatp8hqIXSJ2cwi70B+TvlDk/o19CA3bH6YxrAAVeag1P9hmNlfJ7NxK3Jp7+Ny1Vd7JHWVF+R6rSJiXXPfsXi3ZEy0klJAjI51NrDAnzNtgIQf0V8OWeEVv7F8Rsm3/GKnjdNOcDKymi9agZUgtctENWbCXGFnI40NHuVHtBRZeYAYtwfV7v6U0bP9s7uZGpkp+OETHMv3AyV0MVbZwQvarnjmct4Z3Vma+DvT+Z4VlMVnkC2x2FLt26K3SIMz+KV2XLv5ocEdPFSn1vMR7zruCWC8XqAG288biHo/soldmb/nlw8o8qlfZj4h296K3hfdFubGIUtqgsrZCrLCkkRC08Cv1ozEX/y6t2YrQepwiNmwDVk5IufStVvJMj+y2r9TcYLv7UKWXx3P6aySvM2ZHPaZhv+6Z/A/jIMBSvOizn4qG11iK7Oo6JYhxCSMJZsetjsnL4ecSIAufEmoFlAScWBh6nFArRpVLvkAZ3tej7H2lWFRXIU7x7mdBfGqU82PpM6znKMMZCpEsvHqpkSPSL+Kwz2z1f5wW7BKcKK4kNZ8iveg9VzY1NNjs91qU8DJpUnGyM04C7KNMpeilEmoOxvyelMQdi85ndOVmigVKmy5JYlODNX744sHpeqmMEK/ux3xY5O406lm7dZlyGPSMrFWbm4rzqvSEIskP43+9xVP8L84GeHE4RpOHg3qh/shx+/WnT1UhKuKpByHCpLoEo144udpzZswCYSMp58uPrlwdVF31//AacTRk8dUP3tBlnSQPa1eTpXWFCn7vIiqOTXaRL//YQK+e7ssrgSUnwhuGKJ8aqNDgdsL+haVZnV9g5Qrju643adyNixvYFEp0uxzOzVkekOMh2FYnFVIL2mJYGpZEXlAIC0zQbb54rSP89j0G7soJ2HcOkD0NmMEWj/7hUdTuMin1lRNde/qmHjwhbhqL8Z9MEO/YG3iLMgFTgSNQQhyE8AZAAKnehmzjORJfbK+qxyiJ07J843EDduzOoYt9p/YLqyTFmAgpdfK0uYrtAJ47cbl5WWhVXp5/XUxwWdL7TvQB0Xh6ir1/XBRcsVSDrR7cPE221ThmW1EPzD+SPf2L2gS0WromZqj1PhLgk92YnnR9s7/nLBXZHPKy+fDbJT16QqabFKqAl9G0blyf+R5UGX2kN+iQp4VGXEoH5lXxNNTlgRskzrW7KliQXcac20oimAHUE8Phf+rXXglpmSv4XN3eiwfXwvOaAMVjMRmRxsKitl5iZnwpcdbsC4jt16g2r/ihlKzLIYju+XZej4dNMlkftEidyNg24IVimJthXY1H15RZ8Hm7mAM/JZrsxiAVI0A49pWEiUk3cyZcBzq/vVEjHUy4r6IZnKkRvLjqsvqWE95nAGMor+F0GLHWfBCVkuI51EIOknwSB1eTvLgwgRepV4pdy9cdp6iR8TZndPVCikflXYVMlMEJ2bJ2c0Swiq57ORJW6vQwnkxtPudpFRc7tNNDzz4LKEznJxAwGi6pBR7/co2IUgRw1ijLFTHWHQJOjgc7KaduHI0C6a+BJb4Y8IWuIk2u2qCMF1HNKFAUn/J1gTcqtIJcvK5uykpfJFCYc899TmUc8LMKI9nu57m0S44Y2hPPYeW4XSakScsg8bJHMkcXk3Tbs9b4eqiD+kHUhTS2BGfsHadR3d5j8lNhBPzA5e+mE==",
"User-Agent": "5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2045.47"
}
TTWID = {
"url": "https://ttwid.bytedance.com/ttwid/union/register/",
"data": '{"region":"cn","aid":1768,"needFid":false,"service":"www.ixigua.com","migrate_info":{"ticket":"","source":"node"},"cbUrlProtocol":"https","union":true}'
}
class BaseRequestModel(BaseModel):
device_platform: str = "webapp"
aid: str = "6383"
channel: str = "channel_pc_web"
pc_client_type: int = 1
version_code: str = "290100"
version_name: str = "29.1.0"
cookie_enabled: str = "true"
screen_width: int = 1920
screen_height: int = 1080
browser_language: str = "zh-CN"
browser_platform: str = "Win32"
browser_name: str = "Chrome"
browser_version: str = "130.0.0.0"
browser_online: str = "true"
engine_name: str = "Blink"
engine_version: str = "130.0.0.0"
os_name: str = "Windows"
os_version: str = "10"
cpu_core_num: int = 12
device_memory: int = 8
platform: str = "PC"
downlink: str = "10"
effective_type: str = "4g"
from_user_page: str = "1"
locate_query: str = "false"
need_time_list: str = "1"
pc_libra_divert: str = "Windows"
publish_video_strategy_type: str = "2"
round_trip_time: str = "0"
show_live_replay_strategy: str = "1"
time_list_query: str = "0"
whale_cut_token: str = ""
update_version_code: str = "170400"
msToken: str = None
class DouyinDownloader(Downloader):
def __init__(self, cookie=None):
super().__init__()
self.headers_config = DouyinConfig.HEADERS.copy()
self.headers_config["Cookie"] = os.getenv('DOUYIN_COOKIES')
print(self.headers_config)
self.proxies_config = DouyinConfig.PROXIES.copy()
self.ttwid_config = DouyinConfig.TTWID.copy()
self.ms_token_config = DouyinConfig.MS_TOKEN.copy()
@staticmethod
def find_url(string: str) -> list:
url = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', string)
return url
def extract_video_id(self, url: str) -> str:
video_url = self.find_url(url)
if len(video_url):
video_url = video_url[0]
try:
response = requests.head(video_url, allow_redirects=True)
url = response.url
except Exception as e:
return ""
patterns = [
r'video/(\d+)',
r'aweme_id=(\d+)',
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return ""
def gen_real_msToken(self) -> str:
payload = json.dumps(
{
"magic": self.ms_token_config["magic"],
"version": self.ms_token_config["version"],
"dataType": self.ms_token_config["dataType"],
"strData": self.ms_token_config["strData"],
"tspFromClient": get_timestamp(),
}
)
headers = {
"User-Agent": self.headers_config["User-Agent"],
"Content-Type": "application/json",
}
transport = httpx.HTTPTransport(retries=5)
with httpx.Client(transport=transport) as client:
try:
response = client.post(
self.ms_token_config["url"], content=payload, headers=headers
)
response.raise_for_status()
msToken = str(httpx.Cookies(response.cookies).get("msToken"))
if len(msToken) not in [120, 128]:
raise ValueError("响应内容:{0} Douyin msToken API 的响应内容不符合要求。".format(msToken))
return msToken
except Exception as e:
raise ValueError("Douyin msToken API 请求失败:{0}".format(e))
def fetch_video_info(self, video_url: str) -> json:
aweme_id = self.extract_video_id(video_url)
kwargs = self.headers_config
print("kwargs:", kwargs)
base_params = BaseRequestModel().model_dump()
base_params["msToken"] = self.gen_real_msToken()
base_params["aweme_id"] = aweme_id
bogus = ABogus()
ab_value = bogus.get_value(base_params)
a_bogus = quote(ab_value, safe='')
print(base_params)
query_str = urlencode(base_params)
full_url = f"{DOUYIN_DOMAIN}/aweme/v1/web/aweme/detail/?{query_str}&a_bogus={a_bogus}"
print("Request URL:", full_url)
try:
response = requests.get(full_url, headers=kwargs)
print("Response JSON:", response.content)
return response.json()
except Exception as e:
print("请求失败:", e)
raise ValueError("请求失败:", e)
# print(kwargs)
def download(
self,
video_url: str,
output_dir: Union[str, None] = None,
quality: DownloadQuality = "fast",
need_video:Optional[bool]=False
self,
video_url: str,
output_dir: Union[str, None] = None,
quality: DownloadQuality = "fast",
need_video: Optional[bool] = False
) -> AudioDownloadResult:
print(
f"正在下载视频: {video_url},保存路径: {output_dir},质量: {quality}"
)
if output_dir is None:
output_dir = get_data_dir()
if not output_dir:
output_dir = self.cache_data
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "%(id)s.%(ext)s")
ydl_opts = {
'format': 'bestaudio[ext=m4a]/bestaudio/best',
'outtmpl': output_path,
'postprocessors': [
{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '64',
}
],
'noplaylist': True,
'quiet': False,
video_data = self.fetch_video_info(video_url)
output_path = output_path % {
"id": video_data['aweme_detail']['aweme_id'],
"ext": "mp3",
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
video_id = info.get("id")
title = info.get("title")
duration = info.get("duration", 0)
cover_url = info.get("thumbnail")
audio_path = os.path.join(output_dir, f"{video_id}.mp3")
url = video_data['aweme_detail']['music']['play_url']['uri']
# 下载音频
audio_data = requests.get(url)
with open(output_path, 'wb') as f:
f.write(audio_data.content)
print(url)
tags = []
for tag in video_data['aweme_detail']['video_tag']:
if tag['tag_name']:
tags.append(tag['tag_name'])
return AudioDownloadResult(
file_path=audio_path,
title=title,
duration=duration,
cover_url=cover_url,
file_path=output_path,
title=video_data['aweme_detail']['item_title'],
duration=video_data['aweme_detail']['video']['duration'],
cover_url=video_data['aweme_detail']['video']['cover_original_scale']['url_list'][0] if
video_data['aweme_detail']['video']['cover'] else video_data['video']['big_thumbs']['img_url'],
platform="douyin",
video_id=video_id,
raw_info={'tags':info.get('tags')}, #全部返回会报错
video_id=video_data['aweme_detail']['aweme_id'],
raw_info={
'tags': video_data['aweme_detail']['caption'] + ''.join(tags),
},
video_path=None # ❗音频下载不包含视频路径
)
def download_video(
self,
video_url: str,
output_dir: Union[str, None] = None,
) -> str:
"""
下载视频,返回视频文件路径
"""
if output_dir is None:
output_dir = get_data_dir()
def download_video(self, video_url: str, output_dir: Union[str, None] = None) -> str:
try:
if output_dir is None:
output_dir = get_data_dir()
if not output_dir:
output_dir = self.cache_data
os.makedirs(output_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "%(id)s.%(ext)s")
output_path = os.path.join(output_dir, "%(id)s.%(ext)s")
ydl_opts = {
'format': 'worst[ext=mp4]/worst',
'outtmpl': output_path,
'noplaylist': True,
'quiet': False,
'merge_output_format': 'mp4', # 确保合并成 mp4
}
video_data = self.fetch_video_info(video_url)
output_path = output_path % {
"id": video_data['aweme_detail']['aweme_id'],
"ext": "mp4",
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=True)
video_id = info.get("id")
video_path = os.path.join(output_dir, f"{video_id}.mp4")
url=video_data['aweme_detail']['video']['download_addr']['url_list'][0]
_data = requests.get(url,allow_redirects=True,headers=self.headers_config)
if not os.path.exists(video_path):
raise FileNotFoundError(f"视频文件未找到: {video_path}")
with open(output_path, 'wb') as f:
f.write(_data.content)
return video_path
return output_path
except Exception as e:
print("请求失败:", e)
raise ValueError("请求失败:", e)
if __name__ == '__main__':
dy = DouyinDownloader(
cookie='')
dy.download(
'7.43 11/16 gba:/ j@P.xS 以“马成钢”的视角打开《抓娃娃》笼中鸟,何时飞 # 独白 # 人物故事 https://v.douyin.com/0pcFVdG_lx4/ 复制此链接打开Dou音搜索直接观看视频'
)

View File

@@ -0,0 +1,635 @@
"""
Original Author:
This file is from https://github.com/JoeanAmier/TikTokDownloader
And is licensed under the GNU General Public License v3.0
If you use this code, please keep this license and the original author information.
Modified by:
And this file is now a part of the https://github.com/Evil0ctal/Douyin_TikTok_Download_API open-source project.
This project is licensed under the Apache License 2.0, and the original author information is kept.
Purpose:
This file is used to generate the `a_bogus` parameter for the Douyin Web API.
Changes Made:
1. Changed the ua_code to compatible with the current config file User-Agent string in https://github.com/Evil0ctal/Douyin_TikTok_Download_API/blob/main/crawlers/douyin/web/config.yaml
"""
from random import choice
from random import randint
from random import random
from re import compile
from time import time
from urllib.parse import urlencode
from urllib.parse import quote
from gmssl import sm3, func
__all__ = ["ABogus", ]
class ABogus:
__filter = compile(r'%([0-9A-F]{2})')
__arguments = [0, 1, 14]
__ua_key = "\u0000\u0001\u000e"
__end_string = "cus"
__version = [1, 0, 1, 5]
__browser = "1536|742|1536|864|0|0|0|0|1536|864|1536|864|1536|742|24|24|MacIntel"
__reg = [
1937774191,
1226093241,
388252375,
3666478592,
2842636476,
372324522,
3817729613,
2969243214,
]
__str = {
"s0": "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=",
"s1": "Dkdpgh4ZKsQB80/Mfvw36XI1R25+WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=",
"s2": "Dkdpgh4ZKsQB80/Mfvw36XI1R25-WUAlEi7NLboqYTOPuzmFjJnryx9HVGcaStCe=",
"s3": "ckdp1h4ZKsUB80/Mfvw36XIgR25+WQAlEi7NLboqYTOPuzmFjJnryx9HVGDaStCe",
"s4": "Dkdpgh2ZmsQB80/MfvV36XI1R45-WUAlEixNLwoqYTOPuzKFjJnry79HbGcaStCe",
}
def __init__(self,
# user_agent: str = USERAGENT,
platform: str = None, ):
self.chunk = []
self.size = 0
self.reg = self.__reg[:]
# self.ua_code = self.generate_ua_code(user_agent)
# Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36
self.ua_code = [
76,
98,
15,
131,
97,
245,
224,
133,
122,
199,
241,
166,
79,
34,
90,
191,
128,
126,
122,
98,
66,
11,
14,
40,
49,
110,
110,
173,
67,
96,
138,
252]
self.browser = self.generate_browser_info(
platform) if platform else self.__browser
self.browser_len = len(self.browser)
self.browser_code = self.char_code_at(self.browser)
@classmethod
def list_1(cls, random_num=None, a=170, b=85, c=45, ) -> list:
return cls.random_list(
random_num,
a,
b,
1,
2,
5,
c & a,
)
@classmethod
def list_2(cls, random_num=None, a=170, b=85, ) -> list:
return cls.random_list(
random_num,
a,
b,
1,
0,
0,
0,
)
@classmethod
def list_3(cls, random_num=None, a=170, b=85, ) -> list:
return cls.random_list(
random_num,
a,
b,
1,
0,
5,
0,
)
@staticmethod
def random_list(
a: float = None,
b=170,
c=85,
d=0,
e=0,
f=0,
g=0,
) -> list:
r = a or (random() * 10000)
v = [
r,
int(r) & 255,
int(r) >> 8,
]
s = v[1] & b | d
v.append(s)
s = v[1] & c | e
v.append(s)
s = v[2] & b | f
v.append(s)
s = v[2] & c | g
v.append(s)
return v[-4:]
@staticmethod
def from_char_code(*args):
return "".join(chr(code) for code in args)
@classmethod
def generate_string_1(
cls,
random_num_1=None,
random_num_2=None,
random_num_3=None,
):
return cls.from_char_code(*cls.list_1(random_num_1)) + cls.from_char_code(
*cls.list_2(random_num_2)) + cls.from_char_code(*cls.list_3(random_num_3))
def generate_string_2(
self,
url_params: str,
method="GET",
start_time=0,
end_time=0,
) -> str:
a = self.generate_string_2_list(
url_params,
method,
start_time,
end_time,
)
e = self.end_check_num(a)
a.extend(self.browser_code)
a.append(e)
return self.rc4_encrypt(self.from_char_code(*a), "y")
def generate_string_2_list(
self,
url_params: str,
method="GET",
start_time=0,
end_time=0,
) -> list:
start_time = start_time or int(time() * 1000)
end_time = end_time or (start_time + randint(4, 8))
params_array = self.generate_params_code(url_params)
method_array = self.generate_method_code(method)
return self.list_4(
(end_time >> 24) & 255,
params_array[21],
self.ua_code[23],
(end_time >> 16) & 255,
params_array[22],
self.ua_code[24],
(end_time >> 8) & 255,
(end_time >> 0) & 255,
(start_time >> 24) & 255,
(start_time >> 16) & 255,
(start_time >> 8) & 255,
(start_time >> 0) & 255,
method_array[21],
method_array[22],
int(end_time / 256 / 256 / 256 / 256) >> 0,
int(start_time / 256 / 256 / 256 / 256) >> 0,
self.browser_len,
)
@staticmethod
def reg_to_array(a):
o = [0] * 32
for i in range(8):
c = a[i]
o[4 * i + 3] = (255 & c)
c >>= 8
o[4 * i + 2] = (255 & c)
c >>= 8
o[4 * i + 1] = (255 & c)
c >>= 8
o[4 * i] = (255 & c)
return o
def compress(self, a):
f = self.generate_f(a)
i = self.reg[:]
for o in range(64):
c = self.de(i[0], 12) + i[4] + self.de(self.pe(o), o)
c = (c & 0xFFFFFFFF)
c = self.de(c, 7)
s = (c ^ self.de(i[0], 12)) & 0xFFFFFFFF
u = self.he(o, i[0], i[1], i[2])
u = (u + i[3] + s + f[o + 68]) & 0xFFFFFFFF
b = self.ve(o, i[4], i[5], i[6])
b = (b + i[7] + c + f[o]) & 0xFFFFFFFF
i[3] = i[2]
i[2] = self.de(i[1], 9)
i[1] = i[0]
i[0] = u
i[7] = i[6]
i[6] = self.de(i[5], 19)
i[5] = i[4]
i[4] = (b ^ self.de(b, 9) ^ self.de(b, 17)) & 0xFFFFFFFF
for l in range(8):
self.reg[l] = (self.reg[l] ^ i[l]) & 0xFFFFFFFF
@classmethod
def generate_f(cls, e):
r = [0] * 132
for t in range(16):
r[t] = (e[4 * t] << 24) | (e[4 * t + 1] <<
16) | (e[4 * t + 2] << 8) | e[4 * t + 3]
r[t] &= 0xFFFFFFFF
for n in range(16, 68):
a = r[n - 16] ^ r[n - 9] ^ cls.de(r[n - 3], 15)
a = a ^ cls.de(a, 15) ^ cls.de(a, 23)
r[n] = (a ^ cls.de(r[n - 13], 7) ^ r[n - 6]) & 0xFFFFFFFF
for n in range(68, 132):
r[n] = (r[n - 68] ^ r[n - 64]) & 0xFFFFFFFF
return r
@staticmethod
def pad_array(arr, length=60):
while len(arr) < length:
arr.append(0)
return arr
def fill(self, length=60):
size = 8 * self.size
self.chunk.append(128)
self.chunk = self.pad_array(self.chunk, length)
for i in range(4):
self.chunk.append((size >> 8 * (3 - i)) & 255)
@staticmethod
def list_4(
a: int,
b: int,
c: int,
d: int,
e: int,
f: int,
g: int,
h: int,
i: int,
j: int,
k: int,
m: int,
n: int,
o: int,
p: int,
q: int,
r: int,
) -> list:
return [
44,
a,
0,
0,
0,
0,
24,
b,
n,
0,
c,
d,
0,
0,
0,
1,
0,
239,
e,
o,
f,
g,
0,
0,
0,
0,
h,
0,
0,
14,
i,
j,
0,
k,
m,
3,
p,
1,
q,
1,
r,
0,
0,
0]
@staticmethod
def end_check_num(a: list):
r = 0
for i in a:
r ^= i
return r
@classmethod
def decode_string(cls, url_string, ):
decoded = cls.__filter.sub(cls.replace_func, url_string)
return decoded
@staticmethod
def replace_func(match):
return chr(int(match.group(1), 16))
@staticmethod
def de(e, r):
r %= 32
return ((e << r) & 0xFFFFFFFF) | (e >> (32 - r))
@staticmethod
def pe(e):
return 2043430169 if 0 <= e < 16 else 2055708042
@staticmethod
def he(e, r, t, n):
if 0 <= e < 16:
return (r ^ t ^ n) & 0xFFFFFFFF
elif 16 <= e < 64:
return (r & t | r & n | t & n) & 0xFFFFFFFF
raise ValueError
@staticmethod
def ve(e, r, t, n):
if 0 <= e < 16:
return (r ^ t ^ n) & 0xFFFFFFFF
elif 16 <= e < 64:
return (r & t | ~r & n) & 0xFFFFFFFF
raise ValueError
@staticmethod
def convert_to_char_code(a):
d = []
for i in a:
d.append(ord(i))
return d
@staticmethod
def split_array(arr, chunk_size=64):
result = []
for i in range(0, len(arr), chunk_size):
result.append(arr[i:i + chunk_size])
return result
@staticmethod
def char_code_at(s):
return [ord(char) for char in s]
def write(self, e, ):
self.size = len(e)
if isinstance(e, str):
e = self.decode_string(e)
e = self.char_code_at(e)
if len(e) <= 64:
self.chunk = e
else:
chunks = self.split_array(e, 64)
for i in chunks[:-1]:
self.compress(i)
self.chunk = chunks[-1]
def reset(self, ):
self.chunk = []
self.size = 0
self.reg = self.__reg[:]
def sum(self, e, length=60):
self.reset()
self.write(e)
self.fill(length)
self.compress(self.chunk)
return self.reg_to_array(self.reg)
@classmethod
def generate_result_unit(cls, n, s):
r = ""
for i, j in zip(range(18, -1, -6), (16515072, 258048, 4032, 63)):
r += cls.__str[s][(n & j) >> i]
return r
@classmethod
def generate_result_end(cls, s, e="s4"):
r = ""
b = ord(s[120]) << 16
r += cls.__str[e][(b & 16515072) >> 18]
r += cls.__str[e][(b & 258048) >> 12]
r += "=="
return r
@classmethod
def generate_result(cls, s, e="s4"):
# r = ""
# for i in range(len(s)//4):
# b = ((ord(s[i * 3]) << 16) | (ord(s[i * 3 + 1]))
# << 8) | ord(s[i * 3 + 2])
# r += cls.generate_result_unit(b, e)
# return r
r = []
for i in range(0, len(s), 3):
if i + 2 < len(s):
n = (
(ord(s[i]) << 16)
| (ord(s[i + 1]) << 8)
| ord(s[i + 2])
)
elif i + 1 < len(s):
n = (ord(s[i]) << 16) | (
ord(s[i + 1]) << 8
)
else:
n = ord(s[i]) << 16
for j, k in zip(range(18, -1, -6),
(0xFC0000, 0x03F000, 0x0FC0, 0x3F)):
if j == 6 and i + 1 >= len(s):
break
if j == 0 and i + 2 >= len(s):
break
r.append(cls.__str[e][(n & k) >> j])
r.append("=" * ((4 - len(r) % 4) % 4))
return "".join(r)
@classmethod
def generate_args_code(cls):
a = []
for j in range(24, -1, -8):
a.append(cls.__arguments[0] >> j)
a.append(cls.__arguments[1] / 256)
a.append(cls.__arguments[1] % 256)
a.append(cls.__arguments[1] >> 24)
a.append(cls.__arguments[1] >> 16)
for j in range(24, -1, -8):
a.append(cls.__arguments[2] >> j)
return [int(i) & 255 for i in a]
def generate_method_code(self, method: str = "GET") -> list[int]:
return self.sm3_to_array(self.sm3_to_array(method + self.__end_string))
# return self.sum(self.sum(method + self.__end_string))
def generate_params_code(self, params: str) -> list[int]:
return self.sm3_to_array(self.sm3_to_array(params + self.__end_string))
# return self.sum(self.sum(params + self.__end_string))
@classmethod
def sm3_to_array(cls, data: str | list) -> list[int]:
"""
代码参考: https://github.com/Johnserf-Seed/f2/blob/main/f2/utils/abogus.py
计算请求体的 SM3 哈希值,并将结果转换为整数数组
Calculate the SM3 hash value of the request body and convert the result to an array of integers
Args:
data (Union[str, List[int]]): 输入数据 (Input data).
Returns:
List[int]: 哈希值的整数数组 (Array of integers representing the hash value).
"""
if isinstance(data, str):
b = data.encode("utf-8")
else:
b = bytes(data) # 将 List[int] 转换为字节数组
# 将字节数组转换为适合 sm3.sm3_hash 函数处理的列表格式
h = sm3.sm3_hash(func.bytes_to_list(b))
# 将十六进制字符串结果转换为十进制整数列表
return [int(h[i: i + 2], 16) for i in range(0, len(h), 2)]
@classmethod
def generate_browser_info(cls, platform: str = "Win32") -> str:
inner_width = randint(1280, 1920)
inner_height = randint(720, 1080)
outer_width = randint(inner_width, 1920)
outer_height = randint(inner_height, 1080)
screen_x = 0
screen_y = choice((0, 30))
value_list = [
inner_width,
inner_height,
outer_width,
outer_height,
screen_x,
screen_y,
0,
0,
outer_width,
outer_height,
outer_width,
outer_height,
inner_width,
inner_height,
24,
24,
platform,
]
return "|".join(str(i) for i in value_list)
@staticmethod
def rc4_encrypt(plaintext, key):
s = list(range(256))
j = 0
for i in range(256):
j = (j + s[i] + ord(key[i % len(key)])) % 256
s[i], s[j] = s[j], s[i]
i = 0
j = 0
cipher = []
for k in range(len(plaintext)):
i = (i + 1) % 256
j = (j + s[i]) % 256
s[i], s[j] = s[j], s[i]
t = (s[i] + s[j]) % 256
cipher.append(chr(s[t] ^ ord(plaintext[k])))
return ''.join(cipher)
def get_value(self,
url_params: dict | str,
method="GET",
start_time=0,
end_time=0,
random_num_1=None,
random_num_2=None,
random_num_3=None,
) -> str:
string_1 = self.generate_string_1(
random_num_1,
random_num_2,
random_num_3,
)
string_2 = self.generate_string_2(urlencode(url_params) if isinstance(
url_params, dict) else url_params, method, start_time, end_time, )
string = string_1 + string_2
# return self.generate_result(
# string, "s4") + self.generate_result_end(string, "s4")
return self.generate_result(string, "s4")
if __name__ == "__main__":
bogus = ABogus()
USERAGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36"
url_str = "https://www.douyin.com/aweme/v1/web/aweme/detail/?device_platform=webapp&aid=6383&channel=channel_pc_web&pc_client_type=1&version_code=190500&version_name=19.5.0&cookie_enabled=true&browser_language=zh-CN&browser_platform=Win32&browser_name=Firefox&browser_online=true&engine_name=Gecko&os_name=Windows&os_version=10&platform=PC&screen_width=1920&screen_height=1080&browser_version=124.0&engine_version=122.0.0.0&cpu_core_num=12&device_memory=8&aweme_id=7345492945006595379"
# 将url参数转换为字典
url_params = dict([param.split("=")
for param in url_str.split("?")[1].split("&")])
print(f"URL参数: {url_params}")
a_bogus = bogus.get_value(url_params, )
# 使用url编码a_bogus
a_bogus = quote(a_bogus, safe='')
print(a_bogus)
print(USERAGENT)

View File

@@ -6,7 +6,7 @@ from typing import Optional
from urllib.parse import urlparse
from fastapi import APIRouter, HTTPException, BackgroundTasks, UploadFile, File
from pydantic import BaseModel, validator
from pydantic import BaseModel, validator, field_validator
from dataclasses import asdict
from app.db.video_task_dao import get_task_by_video
@@ -44,7 +44,7 @@ class VideoRequest(BaseModel):
style:str=None
extras:Optional[str]
@validator("video_url")
@field_validator("video_url")
def validate_supported_url(cls, v):
url = str(v)
parsed = urlparse(url)
@@ -52,11 +52,7 @@ class VideoRequest(BaseModel):
# 是网络链接,继续用原有平台校验
if not is_supported_video_url(url):
raise ValueError("暂不支持该视频平台或链接格式无效")
else:
# 是本地路径,检测一下文件是否存在
if not url.startswith('/uploads') and not os.path.exists(url):
raise ValueError("本地文件路径不存在")
return v

View File

@@ -185,7 +185,6 @@ class NoteGenerator:
try:
logger.info(f"🎯 开始解析并生成笔记task_id={task_id}")
self.update_task_status(task_id, TaskStatus.PARSING)
_path=''
downloader = self.get_downloader(platform)
gpt = self.get_gpt(model_name=model_name, provider_id=provider_id)
@@ -213,8 +212,6 @@ class NoteGenerator:
output_dir=path,
need_video=screenshot
)
_path=audio.raw_info.get('path')
print('_path',_path)
with open(audio_cache_path, "w", encoding="utf-8") as f:
json.dump(asdict(audio), f, ensure_ascii=False, indent=2)
logger.info(f"音频下载并缓存成功task_id={task_id}")

View File

@@ -1,4 +1,6 @@
import shutil
from pathlib import Path
from dotenv import load_dotenv
import subprocess
import os
@@ -14,22 +16,29 @@ def generate_screenshot(video_path: str, output_dir: str, timestamp: int, index:
"""
使用 ffmpeg 生成截图,返回生成图片路径
"""
os.makedirs(output_dir, exist_ok=True)
ids=str(uuid.uuid4())
output_path = os.path.join(output_dir, f"screenshot_{str(index)+ids}.jpg")
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
filename = f"screenshot_{index:03}_{uuid.uuid4()}.jpg"
output_path = output_dir / filename
command = [
"ffmpeg",
"-ss", str(timestamp),
"-i", video_path,
"-i", str(video_path),
"-frames:v", "1",
"-q:v", "2", # 图像质量
output_path,
"-y" # 覆盖
"-q:v", "2",
str(output_path),
"-y"
]
subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return output_path
print("Running command:", command)
result = subprocess.run(command, capture_output=True, text=True)
if result.returncode != 0:
print("ffmpeg failed:", result.stderr)
return str(output_path)

View File

@@ -4,7 +4,8 @@ import re
SUPPORTED_PLATFORMS = {
"bilibili": r"(https?://)?(www\.)?bilibili\.com/video/[a-zA-Z0-9]+",
"youtube": r"(https?://)?(www\.)?(youtube\.com/watch\?v=|youtu\.be/)[\w\-]+",
"douyin": r"(https?://)?(www\.)?douyin\.com/video/\d+",
"douyin": r"'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F])"
}