Breaking change: refactor statistics, improve p5.CloseAllConns

This commit is contained in:
kayos@tcp.direct 2023-12-05 13:52:32 -08:00
parent a820ee839f
commit e2c7a443b4
Signed by: kayos
GPG Key ID: 4B841471B4BEE979
10 changed files with 68 additions and 55 deletions

View File

@ -70,24 +70,23 @@ func (p5 *ProxyEngine) Resume() error {
// Note this does not effect the proxy pool, it will continue to operate as normal.
// this is hacky FIXME
func (p5 *ProxyEngine) CloseAllConns() {
timeout := time.NewTicker(200 * time.Millisecond)
p5.conKiller <- struct{}{}
timeout := time.NewTicker(5 * time.Second)
defer func() {
timeout.Stop()
select {
case <-p5.conKiller:
case p5.conKiller <- struct{}{}:
default:
}
}()
for {
select {
case p5.conKiller <- struct{}{}:
timeout.Reset(1 * time.Second)
p5.DebugLogger.Printf("killed a connection")
case <-p5.ctx.Done():
return
case <-timeout.C:
return
case p5.conKiller <- struct{}{}:
timeout.Reset(500 * time.Millisecond)
p5.DebugLogger.Printf("killed a connection")
}
}
}

View File

@ -100,7 +100,9 @@ func (p5 *ProxyEngine) jobSpawner() {
// case <-p5.ctx.Done():
// default:
// }
p5.Pending.RLock()
if p5.Pending.Len() < 1 {
p5.Pending.RUnlock()
count := p5.recycling()
switch {
case count > 0:
@ -113,6 +115,8 @@ func (p5 *ProxyEngine) jobSpawner() {
time.Sleep(time.Millisecond * 100)
}
continue
} else {
p5.Pending.RUnlock()
}
var sock *Proxy

20
defs.go
View File

@ -65,7 +65,7 @@ type ProxyEngine struct {
DebugLogger logger.Logger
// stats holds the Statistics for ProxyEngine
stats *Statistics
stats Statistics
Status uint32
@ -205,7 +205,10 @@ type Swamp struct {
// After calling this you may use the various "setters" to change the options before calling ProxyEngine.Start().
func NewProxyEngine() *ProxyEngine {
p5 := &ProxyEngine{
stats: &Statistics{birthday: time.Now()},
stats: Statistics{
birthday: &atomic.Pointer[time.Time]{},
accountingLastDone: &atomic.Pointer[time.Time]{},
},
DebugLogger: &basicPrinter{},
opt: defOpt(),
@ -219,13 +222,20 @@ func NewProxyEngine() *ProxyEngine {
Status: uint32(stateNew),
}
tnow := time.Now()
p5.stats.birthday.Store(&tnow)
p5.stats.accountingLastDone.Store(&tnow)
p5.lastBadProxAnnnounced.Store("")
p5.httpOptsDirty.Store(false)
p5.httpClients = &sync.Pool{New: func() interface{} { return p5.newHTTPClient() }}
stats := []int64{p5.stats.Valid4, p5.stats.Valid4a, p5.stats.Valid5, p5.stats.ValidHTTP, p5.stats.Dispensed}
for i := range stats {
atomic.StoreInt64(&stats[i], 0)
stats := []**atomic.Int64{
&p5.stats.Valid4, &p5.stats.Valid4a, &p5.stats.Valid5, &p5.stats.ValidHTTP, &p5.stats.Dispensed,
&p5.stats.Checked, &p5.stats.badAccounted, &p5.stats.Stale,
}
for _, i := range stats {
*i = &atomic.Int64{}
}
lists := []*proxyList{&p5.Valids.SOCKS5, &p5.Valids.SOCKS4, &p5.Valids.SOCKS4a, &p5.Valids.HTTP, &p5.Pending}

View File

@ -71,7 +71,7 @@ func currentString(lastMessage string) string {
stats := swamp.GetStatistics()
wMax, wRun, wIdle := swamp.GetWorkers()
return fmt.Sprintf(statsFmt,
stats.GetUptime().Round(time.Second), int(stats.Valid4+stats.Valid4a+stats.Valid5),
stats.GetUptime().Round(time.Second), swamp.GetTotalValidated(),
stats.Dispensed, wMax, wRun, wIdle, swamp.GetAutoScalerStateString(), lastMessage)
}

View File

@ -8,9 +8,8 @@ import (
"git.tcp.direct/kayos/common/entropy"
)
// GetStatistics returns all current Statistics.
// * This is a pointer, do not modify it!
func (p5 *ProxyEngine) GetStatistics() *Statistics {
// GetStatistics returns all Statistics atomics.
func (p5 *ProxyEngine) GetStatistics() Statistics {
p5.mu.RLock()
defer p5.mu.RUnlock()
return p5.stats

View File

@ -42,11 +42,11 @@ func (p5 *ProxyEngine) addTimeout(socksString string) string {
}
func (p5 *ProxyEngine) isEmpty() bool {
stats := p5.GetStatistics()
if stats.Checked == 0 {
if p5.GetStatistics().Checked.Load() == 0 {
return true
}
if stats.Valid5+stats.Valid4+stats.Valid4a+stats.ValidHTTP == 0 {
// if stats.Valid5.Load()+stats.Valid4.Load()+stats.Valid4a.Load()+stats.ValidHTTP.Load() == 0 {
if p5.GetTotalValidated() == 0 {
return true
}
return false

View File

@ -19,11 +19,12 @@ import (
"git.tcp.direct/kayos/go-socks5"
)
func init() {
os.Setenv("PROX5_SCALER_DEBUG", "1")
}
var failures = &atomic.Int64{}
var failures int64 = 0
func init() {
_ = os.Setenv("PROX5_SCALER_DEBUG", "1")
failures.Store(0)
}
type randomFail struct {
t *testing.T
@ -49,7 +50,7 @@ func (rf *randomFail) fail() bool {
rf.t.Errorf("[FAIL] random SOCKS failure triggered too many times, total fail count: %d", rf.failedCount)
}
atomic.AddInt64(&failures, 1)
failures.Add(1)
return true
}
@ -125,14 +126,14 @@ func (tl p5TestLogger) Errorf(format string, args ...interface{}) {
func (tl p5TestLogger) Printf(format string, args ...interface{}) {
val := fmt.Sprintf(format, args...)
if strings.Contains(val, "failed to verify") {
atomic.AddInt64(&failures, 1)
failures.Add(1)
}
tl.t.Logf("[PRINT] " + val)
}
func (tl p5TestLogger) Print(args ...interface{}) {
val := fmt.Sprintf("%+v", args...)
if strings.Contains(val, "failed to verify") {
atomic.AddInt64(&failures, 1)
failures.Add(1)
}
tl.t.Log("[PRINT] " + val)
}
@ -178,7 +179,7 @@ func TestProx5(t *testing.T) {
}
time.Sleep(time.Second * 1)
got := p5.GetTotalValidated()
want := 55 - int(atomic.LoadInt64(&failures))
want := 55 - failures.Load()
if got != want {
t.Logf("[WARN] total validated proxies does not match expected, got: %d, expected: %d",
got, want)

View File

@ -21,15 +21,16 @@ func (p5 *ProxyEngine) scale() (sleep bool) {
case <-p5.scaleTimer.C:
bad := int64(0)
totalBadNow := p5.GetTotalBad()
accountedFor := atomic.LoadInt64(&p5.stats.badAccounted)
accountedFor := p5.stats.badAccounted.Load()
netFactors := totalBadNow - accountedFor
if time.Since(p5.stats.accountingLastDone) > 5*time.Second && netFactors > 0 {
bad = int64(netFactors)
if time.Since(*p5.stats.accountingLastDone.Load()) > 5*time.Second && netFactors > 0 {
bad = netFactors
if p5.DebugEnabled() {
p5.DebugLogger.Printf("accounting: %d bad - %d accounted for = %d net factors",
totalBadNow, accountedFor, netFactors)
}
p5.stats.accountingLastDone = time.Now()
tnow := time.Now()
p5.stats.accountingLastDone.Store(&tnow)
}
// this shouldn't happen..?
if bad < 0 {
@ -39,8 +40,8 @@ func (p5 *ProxyEngine) scale() (sleep bool) {
return
}
totalValidated := int64(p5.GetTotalValidated())
totalConsidered := int64(atomic.LoadInt64(&p5.GetStatistics().Dispensed)) + bad
totalValidated := p5.GetTotalValidated()
totalConsidered := p5.GetStatistics().Dispensed.Load() + bad
// if we are considering more than we have validated, cap it at validated so that it registers properly.
// additionally, signal the dialer to slow down a little.

View File

@ -8,60 +8,59 @@ import (
// Statistics is used to encapsulate various proxy engine stats
type Statistics struct {
// Valid4 is the amount of SOCKS4 proxies validated
Valid4 int64
Valid4 *atomic.Int64
// Valid4a is the amount of SOCKS4a proxies validated
Valid4a int64
Valid4a *atomic.Int64
// Valid5 is the amount of SOCKS5 proxies validated
Valid5 int64
Valid5 *atomic.Int64
// ValidHTTP is the amount of HTTP proxies validated
ValidHTTP int64
ValidHTTP *atomic.Int64
// Dispensed is a simple ticker to keep track of proxies dispensed via our getters
Dispensed int64
Dispensed *atomic.Int64
// Stale is the amount of proxies that failed our stale policy upon dispensing
Stale int64
Stale *atomic.Int64
// Checked is the amount of proxies we've checked.
Checked int64
Checked *atomic.Int64
// birthday represents the time we started checking proxies with this pool
birthday time.Time
birthday *atomic.Pointer[time.Time]
badAccounted int64
accountingLastDone time.Time
badAccounted *atomic.Int64
accountingLastDone *atomic.Pointer[time.Time]
}
func (stats *Statistics) dispense() {
atomic.AddInt64(&stats.Dispensed, 1)
stats.Dispensed.Add(1)
}
func (stats *Statistics) stale() {
atomic.AddInt64(&stats.Stale, 1)
stats.Stale.Add(1)
}
func (stats *Statistics) v4() {
atomic.AddInt64(&stats.Valid4, 1)
stats.Valid4.Add(1)
}
func (stats *Statistics) v4a() {
atomic.AddInt64(&stats.Valid4a, 1)
stats.Valid4a.Add(1)
}
func (stats *Statistics) v5() {
atomic.AddInt64(&stats.Valid5, 1)
stats.Valid5.Add(1)
}
func (stats *Statistics) http() {
atomic.AddInt64(&stats.ValidHTTP, 1)
stats.ValidHTTP.Add(1)
}
// GetTotalValidated retrieves our grand total validated proxy count.
func (p5 *ProxyEngine) GetTotalValidated() int {
func (p5 *ProxyEngine) GetTotalValidated() int64 {
stats := p5.GetStatistics()
total := int64(0)
for _, val := range []*int64{&stats.Valid4a, &stats.Valid4, &stats.Valid5, &stats.ValidHTTP} {
atomic.AddInt64(&total, atomic.LoadInt64(val))
for _, val := range []*atomic.Int64{stats.Valid4a, stats.Valid4, stats.Valid5, stats.ValidHTTP} {
total += val.Load()
}
return int(total)
return total
}
func (p5 *ProxyEngine) GetTotalBad() int64 {
@ -71,5 +70,5 @@ func (p5 *ProxyEngine) GetTotalBad() int64 {
// GetUptime returns the total lifetime duration of our pool.
func (stats *Statistics) GetUptime() time.Duration {
return time.Since(stats.birthday)
return time.Since(*stats.birthday.Load())
}

View File

@ -134,7 +134,7 @@ func (p5 *ProxyEngine) validate(hmd *handMeDown) (string, error) {
}
func (p5 *ProxyEngine) anothaOne() {
atomic.AddInt64(&p5.stats.Checked, 1)
p5.stats.Checked.Add(1)
}
type handMeDown struct {