Files
GoProxy/pool/manager.go
isboyjc f55209d8d3 feat: init
2026-03-29 03:31:59 +08:00

230 lines
6.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package pool
import (
"log"
"goproxy/config"
"goproxy/storage"
)
// Manager 池子管理器
type Manager struct {
storage *storage.Storage
cfg *config.Config
}
func NewManager(s *storage.Storage, cfg *config.Config) *Manager {
return &Manager{
storage: s,
cfg: cfg,
}
}
// PoolStatus 池子状态
type PoolStatus struct {
Total int
HTTP int
SOCKS5 int
HTTPSlots int
SOCKS5Slots int
State string // healthy/warning/critical/emergency
AvgLatencyHTTP int
AvgLatencySocks5 int
}
// GetStatus 获取当前池子状态
func (m *Manager) GetStatus() (*PoolStatus, error) {
total, _ := m.storage.Count()
httpCount, _ := m.storage.CountByProtocol("http")
socks5Count, _ := m.storage.CountByProtocol("socks5")
httpSlots, socks5Slots := m.cfg.CalculateSlots()
// 计算平均延迟
avgHTTP, _ := m.storage.GetAverageLatency("http")
avgSOCKS5, _ := m.storage.GetAverageLatency("socks5")
// 判断状态
state := m.determineState(total, httpCount, socks5Count)
return &PoolStatus{
Total: total,
HTTP: httpCount,
SOCKS5: socks5Count,
HTTPSlots: httpSlots,
SOCKS5Slots: socks5Slots,
State: state,
AvgLatencyHTTP: avgHTTP,
AvgLatencySocks5: avgSOCKS5,
}, nil
}
// determineState 判断池子状态
func (m *Manager) determineState(total, httpCount, socks5Count int) string {
httpSlots, socks5Slots := m.cfg.CalculateSlots()
// 单协议缺失
if httpCount == 0 || socks5Count == 0 {
return "emergency"
}
// 紧急:总数<10%
emergencyThreshold := int(float64(m.cfg.PoolMaxSize) * 0.1)
if total < emergencyThreshold {
return "emergency"
}
// 危急:任一协议<20%槽位
if httpCount < int(float64(httpSlots)*0.2) || socks5Count < int(float64(socks5Slots)*0.2) {
return "critical"
}
// 警告:总数<95%
healthyThreshold := int(float64(m.cfg.PoolMaxSize) * 0.95)
if total < healthyThreshold {
return "warning"
}
// 健康
return "healthy"
}
// NeedsFetch 判断是否需要抓取以及抓取模式
func (m *Manager) NeedsFetch(status *PoolStatus) (bool, string, string) {
// 单协议缺失:紧急模式,指定协议
if status.HTTP == 0 {
return true, "emergency", "http"
}
if status.SOCKS5 == 0 {
return true, "emergency", "socks5"
}
// 紧急状态:紧急模式
if status.State == "emergency" {
return true, "emergency", ""
}
// 危急或警告:补充模式
if status.State == "critical" || status.State == "warning" {
// 判断哪个协议更缺
httpPct := float64(status.HTTP) / float64(status.HTTPSlots)
socks5Pct := float64(status.SOCKS5) / float64(status.SOCKS5Slots)
if httpPct < 0.5 {
return true, "refill", "http"
}
if socks5Pct < 0.5 {
return true, "refill", "socks5"
}
return true, "refill", ""
}
// 健康状态:不需要补充抓取
return false, "", ""
}
// NeedsFetchQuick 快速判断是否还需要抓取(用于提前终止)
func (m *Manager) NeedsFetchQuick(status *PoolStatus) bool {
need, _, _ := m.NeedsFetch(status)
return need
}
// TryAddProxy 尝试将代理加入池子
func (m *Manager) TryAddProxy(p storage.Proxy) (bool, string) {
httpSlots, socks5Slots := m.cfg.CalculateSlots()
httpCount, _ := m.storage.CountByProtocol("http")
socks5Count, _ := m.storage.CountByProtocol("socks5")
total, _ := m.storage.Count()
var maxSlots int
var currentCount int
if p.Protocol == "http" {
maxSlots = httpSlots
currentCount = httpCount
} else {
maxSlots = socks5Slots
currentCount = socks5Count
}
// 情况1该协议槽位未满直接加入
if currentCount < maxSlots {
if err := m.storage.AddProxy(p.Address, p.Protocol); err != nil {
return false, "db_error"
}
// 更新完整信息
m.storage.UpdateExitInfo(p.Address, p.ExitIP, p.ExitLocation, p.Latency)
log.Printf("[pool] ✅ 直接入池: %s (%s %d/%d) %dms %s %s",
p.Address, p.Protocol, currentCount+1, maxSlots, p.Latency, p.ExitIP, p.ExitLocation)
return true, "added"
}
// 情况2槽位满但允许10%浮动
allowedFloat := int(float64(maxSlots) * 0.1)
if total < m.cfg.PoolMaxSize && currentCount < maxSlots+allowedFloat {
if err := m.storage.AddProxy(p.Address, p.Protocol); err != nil {
return false, "db_error"
}
m.storage.UpdateExitInfo(p.Address, p.ExitIP, p.ExitLocation, p.Latency)
log.Printf("[pool] ✅ 浮动入池: %s (%s %d/%d+%d) %dms",
p.Address, p.Protocol, currentCount+1, maxSlots, allowedFloat, p.Latency)
return true, "added_float"
}
// 情况3池子满了尝试替换
if currentCount >= maxSlots || total >= m.cfg.PoolMaxSize {
return m.tryReplace(p)
}
return false, "slots_full"
}
// tryReplace 尝试替换现有代理
func (m *Manager) tryReplace(newProxy storage.Proxy) (bool, string) {
// 获取同协议中可替换的代理延迟最高的前10个
candidates, err := m.storage.GetWorstProxies(newProxy.Protocol, 10)
if err != nil || len(candidates) == 0 {
return false, "no_candidates"
}
worst := candidates[0]
// 判断是否值得替换:新代理需要显著更快
threshold := m.cfg.ReplaceThreshold
if float64(newProxy.Latency) < float64(worst.Latency)*threshold {
if err := m.storage.ReplaceProxy(worst.Address, newProxy); err != nil {
return false, "replace_error"
}
log.Printf("[pool] 🔄 替换: %s(%dms) → %s(%dms) 提升%.0f%%",
worst.Address, worst.Latency, newProxy.Address, newProxy.Latency,
(1-float64(newProxy.Latency)/float64(worst.Latency))*100)
return true, "replaced"
}
return false, "not_better"
}
// AdjustForConfigChange 配置变更后调整池子
func (m *Manager) AdjustForConfigChange(oldSize int, oldRatio float64) {
newHTTP, newSOCKS5 := m.cfg.CalculateSlots()
oldHTTP := int(float64(oldSize) * oldRatio)
oldSOCKS5 := oldSize - oldHTTP
log.Printf("[pool] 配置变更: 容量 %d→%d, HTTP槽位 %d→%d, SOCKS5槽位 %d→%d",
oldSize, m.cfg.PoolMaxSize, oldHTTP, newHTTP, oldSOCKS5, newSOCKS5)
// 如果槽位减少且当前超标,标记超标的为替换候选
httpCount, _ := m.storage.CountByProtocol("http")
socks5Count, _ := m.storage.CountByProtocol("socks5")
if httpCount > newHTTP {
excess := httpCount - newHTTP
log.Printf("[pool] HTTP 超标 %d 个,标记为替换候选", excess)
// 这里可以标记延迟高的为候选
}
if socks5Count > newSOCKS5 {
excess := socks5Count - newSOCKS5
log.Printf("[pool] SOCKS5 超标 %d 个,标记为替换候选", excess)
}
}