mirror of
https://github.com/isboyjc/GoProxy.git
synced 2026-05-07 06:12:41 +08:00
230 lines
6.4 KiB
Go
230 lines
6.4 KiB
Go
package pool
|
||
|
||
import (
|
||
"log"
|
||
|
||
"goproxy/config"
|
||
"goproxy/storage"
|
||
)
|
||
|
||
// Manager 池子管理器
|
||
type Manager struct {
|
||
storage *storage.Storage
|
||
cfg *config.Config
|
||
}
|
||
|
||
func NewManager(s *storage.Storage, cfg *config.Config) *Manager {
|
||
return &Manager{
|
||
storage: s,
|
||
cfg: cfg,
|
||
}
|
||
}
|
||
|
||
// PoolStatus 池子状态
|
||
type PoolStatus struct {
|
||
Total int
|
||
HTTP int
|
||
SOCKS5 int
|
||
HTTPSlots int
|
||
SOCKS5Slots int
|
||
State string // healthy/warning/critical/emergency
|
||
AvgLatencyHTTP int
|
||
AvgLatencySocks5 int
|
||
}
|
||
|
||
// GetStatus 获取当前池子状态
|
||
func (m *Manager) GetStatus() (*PoolStatus, error) {
|
||
total, _ := m.storage.Count()
|
||
httpCount, _ := m.storage.CountByProtocol("http")
|
||
socks5Count, _ := m.storage.CountByProtocol("socks5")
|
||
|
||
httpSlots, socks5Slots := m.cfg.CalculateSlots()
|
||
|
||
// 计算平均延迟
|
||
avgHTTP, _ := m.storage.GetAverageLatency("http")
|
||
avgSOCKS5, _ := m.storage.GetAverageLatency("socks5")
|
||
|
||
// 判断状态
|
||
state := m.determineState(total, httpCount, socks5Count)
|
||
|
||
return &PoolStatus{
|
||
Total: total,
|
||
HTTP: httpCount,
|
||
SOCKS5: socks5Count,
|
||
HTTPSlots: httpSlots,
|
||
SOCKS5Slots: socks5Slots,
|
||
State: state,
|
||
AvgLatencyHTTP: avgHTTP,
|
||
AvgLatencySocks5: avgSOCKS5,
|
||
}, nil
|
||
}
|
||
|
||
// determineState 判断池子状态
|
||
func (m *Manager) determineState(total, httpCount, socks5Count int) string {
|
||
httpSlots, socks5Slots := m.cfg.CalculateSlots()
|
||
|
||
// 单协议缺失
|
||
if httpCount == 0 || socks5Count == 0 {
|
||
return "emergency"
|
||
}
|
||
|
||
// 紧急:总数<10%
|
||
emergencyThreshold := int(float64(m.cfg.PoolMaxSize) * 0.1)
|
||
if total < emergencyThreshold {
|
||
return "emergency"
|
||
}
|
||
|
||
// 危急:任一协议<20%槽位
|
||
if httpCount < int(float64(httpSlots)*0.2) || socks5Count < int(float64(socks5Slots)*0.2) {
|
||
return "critical"
|
||
}
|
||
|
||
// 警告:总数<95%
|
||
healthyThreshold := int(float64(m.cfg.PoolMaxSize) * 0.95)
|
||
if total < healthyThreshold {
|
||
return "warning"
|
||
}
|
||
|
||
// 健康
|
||
return "healthy"
|
||
}
|
||
|
||
// NeedsFetch 判断是否需要抓取以及抓取模式
|
||
func (m *Manager) NeedsFetch(status *PoolStatus) (bool, string, string) {
|
||
// 单协议缺失:紧急模式,指定协议
|
||
if status.HTTP == 0 {
|
||
return true, "emergency", "http"
|
||
}
|
||
if status.SOCKS5 == 0 {
|
||
return true, "emergency", "socks5"
|
||
}
|
||
|
||
// 紧急状态:紧急模式
|
||
if status.State == "emergency" {
|
||
return true, "emergency", ""
|
||
}
|
||
|
||
// 危急或警告:补充模式
|
||
if status.State == "critical" || status.State == "warning" {
|
||
// 判断哪个协议更缺
|
||
httpPct := float64(status.HTTP) / float64(status.HTTPSlots)
|
||
socks5Pct := float64(status.SOCKS5) / float64(status.SOCKS5Slots)
|
||
|
||
if httpPct < 0.5 {
|
||
return true, "refill", "http"
|
||
}
|
||
if socks5Pct < 0.5 {
|
||
return true, "refill", "socks5"
|
||
}
|
||
return true, "refill", ""
|
||
}
|
||
|
||
// 健康状态:不需要补充抓取
|
||
return false, "", ""
|
||
}
|
||
|
||
// NeedsFetchQuick 快速判断是否还需要抓取(用于提前终止)
|
||
func (m *Manager) NeedsFetchQuick(status *PoolStatus) bool {
|
||
need, _, _ := m.NeedsFetch(status)
|
||
return need
|
||
}
|
||
|
||
// TryAddProxy 尝试将代理加入池子
|
||
func (m *Manager) TryAddProxy(p storage.Proxy) (bool, string) {
|
||
httpSlots, socks5Slots := m.cfg.CalculateSlots()
|
||
httpCount, _ := m.storage.CountByProtocol("http")
|
||
socks5Count, _ := m.storage.CountByProtocol("socks5")
|
||
total, _ := m.storage.Count()
|
||
|
||
var maxSlots int
|
||
var currentCount int
|
||
if p.Protocol == "http" {
|
||
maxSlots = httpSlots
|
||
currentCount = httpCount
|
||
} else {
|
||
maxSlots = socks5Slots
|
||
currentCount = socks5Count
|
||
}
|
||
|
||
// 情况1:该协议槽位未满,直接加入
|
||
if currentCount < maxSlots {
|
||
if err := m.storage.AddProxy(p.Address, p.Protocol); err != nil {
|
||
return false, "db_error"
|
||
}
|
||
// 更新完整信息
|
||
m.storage.UpdateExitInfo(p.Address, p.ExitIP, p.ExitLocation, p.Latency)
|
||
log.Printf("[pool] ✅ 直接入池: %s (%s %d/%d) %dms %s %s",
|
||
p.Address, p.Protocol, currentCount+1, maxSlots, p.Latency, p.ExitIP, p.ExitLocation)
|
||
return true, "added"
|
||
}
|
||
|
||
// 情况2:槽位满,但允许10%浮动
|
||
allowedFloat := int(float64(maxSlots) * 0.1)
|
||
if total < m.cfg.PoolMaxSize && currentCount < maxSlots+allowedFloat {
|
||
if err := m.storage.AddProxy(p.Address, p.Protocol); err != nil {
|
||
return false, "db_error"
|
||
}
|
||
m.storage.UpdateExitInfo(p.Address, p.ExitIP, p.ExitLocation, p.Latency)
|
||
log.Printf("[pool] ✅ 浮动入池: %s (%s %d/%d+%d) %dms",
|
||
p.Address, p.Protocol, currentCount+1, maxSlots, allowedFloat, p.Latency)
|
||
return true, "added_float"
|
||
}
|
||
|
||
// 情况3:池子满了,尝试替换
|
||
if currentCount >= maxSlots || total >= m.cfg.PoolMaxSize {
|
||
return m.tryReplace(p)
|
||
}
|
||
|
||
return false, "slots_full"
|
||
}
|
||
|
||
// tryReplace 尝试替换现有代理
|
||
func (m *Manager) tryReplace(newProxy storage.Proxy) (bool, string) {
|
||
// 获取同协议中可替换的代理(延迟最高的前10个)
|
||
candidates, err := m.storage.GetWorstProxies(newProxy.Protocol, 10)
|
||
if err != nil || len(candidates) == 0 {
|
||
return false, "no_candidates"
|
||
}
|
||
|
||
worst := candidates[0]
|
||
|
||
// 判断是否值得替换:新代理需要显著更快
|
||
threshold := m.cfg.ReplaceThreshold
|
||
if float64(newProxy.Latency) < float64(worst.Latency)*threshold {
|
||
if err := m.storage.ReplaceProxy(worst.Address, newProxy); err != nil {
|
||
return false, "replace_error"
|
||
}
|
||
log.Printf("[pool] 🔄 替换: %s(%dms) → %s(%dms) 提升%.0f%%",
|
||
worst.Address, worst.Latency, newProxy.Address, newProxy.Latency,
|
||
(1-float64(newProxy.Latency)/float64(worst.Latency))*100)
|
||
return true, "replaced"
|
||
}
|
||
|
||
return false, "not_better"
|
||
}
|
||
|
||
// AdjustForConfigChange 配置变更后调整池子
|
||
func (m *Manager) AdjustForConfigChange(oldSize int, oldRatio float64) {
|
||
newHTTP, newSOCKS5 := m.cfg.CalculateSlots()
|
||
oldHTTP := int(float64(oldSize) * oldRatio)
|
||
oldSOCKS5 := oldSize - oldHTTP
|
||
|
||
log.Printf("[pool] 配置变更: 容量 %d→%d, HTTP槽位 %d→%d, SOCKS5槽位 %d→%d",
|
||
oldSize, m.cfg.PoolMaxSize, oldHTTP, newHTTP, oldSOCKS5, newSOCKS5)
|
||
|
||
// 如果槽位减少且当前超标,标记超标的为替换候选
|
||
httpCount, _ := m.storage.CountByProtocol("http")
|
||
socks5Count, _ := m.storage.CountByProtocol("socks5")
|
||
|
||
if httpCount > newHTTP {
|
||
excess := httpCount - newHTTP
|
||
log.Printf("[pool] HTTP 超标 %d 个,标记为替换候选", excess)
|
||
// 这里可以标记延迟高的为候选
|
||
}
|
||
|
||
if socks5Count > newSOCKS5 {
|
||
excess := socks5Count - newSOCKS5
|
||
log.Printf("[pool] SOCKS5 超标 %d 个,标记为替换候选", excess)
|
||
}
|
||
}
|