Merge pull request #2684 from InfinityPacer/dev

This commit is contained in:
jxxghp
2024-09-05 06:54:24 +08:00
committed by GitHub
3 changed files with 137 additions and 13 deletions

View File

@@ -15,7 +15,7 @@ from app.modules.douban.douban_cache import DoubanCache
from app.modules.douban.scraper import DoubanScraper
from app.schemas import MediaPerson, APIRateLimitException
from app.schemas.types import MediaType
from app.utils.common import retry
from app.utils.common import retry, rate_limit_handler
from app.utils.http import RequestUtils
@@ -145,6 +145,7 @@ class DoubanModule(_ModuleBase):
return None
@rate_limit_handler(backoff_factor=2, source="douban_info", raise_on_limit=False)
def douban_info(self, doubanid: str, mtype: MediaType = None, raise_exception: bool = True) -> Optional[dict]:
"""
获取豆瓣信息
@@ -428,9 +429,7 @@ class DoubanModule(_ModuleBase):
if "subject_ip_rate_limit" in info.get("msg", ""):
msg = f"触发豆瓣IP速率限制错误信息{info} ..."
logger.warn(msg)
if raise_exception:
raise APIRateLimitException(msg)
return None
raise APIRateLimitException(msg)
celebrities = self.doubanapi.tv_celebrities(doubanid)
if celebrities:
info["directors"] = celebrities.get("directors")
@@ -446,9 +445,7 @@ class DoubanModule(_ModuleBase):
if "subject_ip_rate_limit" in info.get("msg", ""):
msg = f"触发豆瓣IP速率限制错误信息{info} ..."
logger.warn(msg)
if raise_exception:
raise APIRateLimitException(msg)
return None
raise APIRateLimitException(msg)
celebrities = self.doubanapi.movie_celebrities(doubanid)
if celebrities:
info["directors"] = celebrities.get("directors")
@@ -605,6 +602,7 @@ class DoubanModule(_ModuleBase):
return []
@retry(Exception, 5, 3, 3, logger=logger)
@rate_limit_handler(source="match_doubaninfo", raise_on_limit=False)
def match_doubaninfo(self, name: str, imdbid: str = None,
mtype: MediaType = None, year: str = None, season: int = None,
raise_exception: bool = False) -> dict:
@@ -638,9 +636,7 @@ class DoubanModule(_ModuleBase):
if "search_access_rate_limit" in result.values():
msg = f"触发豆瓣API速率限制错误信息{result} ..."
logger.warn(msg)
if raise_exception:
raise APIRateLimitException(msg)
return {}
raise APIRateLimitException(msg)
if not result.get("items"):
logger.warn(f"未找到 {name} 的豆瓣信息")
return {}

View File

@@ -6,9 +6,26 @@ class ImmediateException(Exception):
pass
class APIRateLimitException(ImmediateException):
class LimitException(ImmediateException):
"""
用于表示本地限流器或外部触发的限流异常的基类。
该异常类可用于本地限流逻辑或外部限流处理。
"""
pass
class APIRateLimitException(LimitException):
"""
用于表示API速率限制的异常类。
当API调用触发速率限制时可以抛出此异常以立即终止操作并报告错误。
"""
pass
class RateLimitExceededException(LimitException):
"""
用于表示本地限流器触发的异常类。
当函数调用频率超过限流器的限制时,可以抛出此异常以停止当前操作并告知调用者限流情况。
这个异常通常用于本地限流逻辑(例如 RateLimiter当系统检测到函数调用频率过高时触发限流并抛出该异常。
"""
pass

View File

@@ -1,7 +1,10 @@
import functools
import threading
import time
from typing import Any
from typing import Any, Callable, Optional, Tuple
from app.schemas import ImmediateException
from app.log import logger
from app.schemas import ImmediateException, RateLimitExceededException, LimitException
def retry(ExceptionToCheck: Any,
@@ -36,3 +39,111 @@ def retry(ExceptionToCheck: Any,
return f_retry
return deco_retry
class RateLimiter:
"""
限流器类,用于处理调用的限流逻辑
通过增加等待时间逐步减少调用的频率,以避免触发限流
"""
def __init__(self, base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0, source: str = ""):
"""
初始化 RateLimiter 实例
:param base_wait: 基础等待时间(秒),默认值为 60 秒1 分钟)
:param max_wait: 最大等待时间(秒),默认值为 600 秒10 分钟)
:param backoff_factor: 等待时间的递增倍数,默认值为 2.0,表示指数退避
:param source: 业务来源或上下文信息,默认值为 ""
"""
self.next_allowed_time = 0
self.current_wait = base_wait
self.base_wait = base_wait
self.max_wait = max_wait
self.backoff_factor = backoff_factor
self.source = source
self.lock = threading.Lock()
def can_call(self) -> Tuple[bool, str]:
"""
检查是否可以进行下一次调用
:return: 如果当前时间超过下一次允许调用的时间,返回 True否则返回 False
"""
current_time = time.time()
with self.lock:
if current_time >= self.next_allowed_time:
return True, ""
wait_time = self.next_allowed_time - current_time
message = self.format_log(f"限流期间,跳过调用,将在 {wait_time:.2f} 秒后允许继续调用")
logger.info(message)
return False, message
def reset(self):
"""
重置等待时间
当调用成功时调用此方法,重置当前等待时间为基础等待时间
"""
with self.lock:
if self.next_allowed_time != 0 or self.current_wait > self.base_wait:
logger.info(self.format_log(f"调用成功,重置限流等待时间为{self.base_wait}"))
self.next_allowed_time = 0
self.current_wait = self.base_wait
def trigger_limit(self):
"""
触发限流
当触发限流异常时调用此方法,增加下一次允许调用的时间并更新当前等待时间
"""
current_time = time.time()
with self.lock:
self.next_allowed_time = current_time + self.current_wait
logger.warn(self.format_log(f"触发限流,将在 {self.current_wait} 秒后允许继续调用"))
self.current_wait = min(self.current_wait * self.backoff_factor, self.max_wait)
def format_log(self, message: str) -> str:
"""
格式化日志消息
:param message: 日志内容
:return: 格式化后的日志消息
"""
return f"[{self.source}] {message}" if self.source else message
def rate_limit_handler(base_wait: int = 60, max_wait: int = 600, backoff_factor: float = 2.0,
raise_on_limit: bool = True, source: str = "") -> Callable:
"""
装饰器,用于处理限流逻辑,支持动态控制是否在限流时抛出异常
:param base_wait: 基础等待时间(秒),默认值为 60 秒1 分钟)
:param max_wait: 最大等待时间(秒),默认值为 600 秒10 分钟)
:param backoff_factor: 等待时间的递增倍数,默认值为 2.0
:param raise_on_limit: 控制默认情况下是否在限流时抛出异常,默认为 True限流时抛出异常
如果在函数调用时传入 `raise_exception` 参数,则以传入值为准。
:param source: 业务来源或上下文信息,默认为 ""
:return: 装饰器函数
"""
rate_limiter = RateLimiter(base_wait, max_wait, backoff_factor, source)
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Optional[Any]:
# 动态检查是否传入了 raise_exception否则使用默认的 raise_on_limit
raise_exception = kwargs.get("raise_exception", raise_on_limit)
can_call, message = rate_limiter.can_call()
if not can_call:
if raise_exception:
raise RateLimitExceededException(message)
return None
try:
result = func(*args, **kwargs)
rate_limiter.reset()
return result
except LimitException as e:
rate_limiter.trigger_limit()
logger.error(rate_limiter.format_log(f"触发限流:{str(e)}"))
if raise_exception:
raise e
return None
return wrapper
return decorator