From c41bb06f3d31e893c258502e66e692bf5c27075d Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Tue, 5 Dec 2023 13:52:32 -0800 Subject: [PATCH] Breaking change: refactor statistics, improve p5.CloseAllConns --- conductor.go | 11 +++++------ daemons.go | 4 ++++ defs.go | 20 +++++++++++++++----- example/main.go | 2 +- getters.go | 5 ++--- mystery_dialer.go | 6 +++--- prox5_test.go | 17 +++++++++-------- scale_util.go | 13 +++++++------ stats.go | 43 +++++++++++++++++++++---------------------- validator_engine.go | 2 +- 10 files changed, 68 insertions(+), 55 deletions(-) diff --git a/conductor.go b/conductor.go index 95af0e9..838949e 100644 --- a/conductor.go +++ b/conductor.go @@ -70,24 +70,23 @@ func (p5 *ProxyEngine) Resume() error { // Note this does not effect the proxy pool, it will continue to operate as normal. // this is hacky FIXME func (p5 *ProxyEngine) CloseAllConns() { - timeout := time.NewTicker(200 * time.Millisecond) - p5.conKiller <- struct{}{} + timeout := time.NewTicker(5 * time.Second) defer func() { timeout.Stop() select { - case <-p5.conKiller: + case p5.conKiller <- struct{}{}: default: } }() for { select { + case p5.conKiller <- struct{}{}: + timeout.Reset(1 * time.Second) + p5.DebugLogger.Printf("killed a connection") case <-p5.ctx.Done(): return case <-timeout.C: return - case p5.conKiller <- struct{}{}: - timeout.Reset(500 * time.Millisecond) - p5.DebugLogger.Printf("killed a connection") } } } diff --git a/daemons.go b/daemons.go index a25fe3a..270814e 100644 --- a/daemons.go +++ b/daemons.go @@ -100,7 +100,9 @@ func (p5 *ProxyEngine) jobSpawner() { // case <-p5.ctx.Done(): // default: // } + p5.Pending.RLock() if p5.Pending.Len() < 1 { + p5.Pending.RUnlock() count := p5.recycling() switch { case count > 0: @@ -113,6 +115,8 @@ func (p5 *ProxyEngine) jobSpawner() { time.Sleep(time.Millisecond * 100) } continue + } else { + p5.Pending.RUnlock() } var sock *Proxy diff --git a/defs.go b/defs.go index 5797063..c9f93de 100644 --- a/defs.go +++ b/defs.go @@ -65,7 +65,7 @@ type ProxyEngine struct { DebugLogger logger.Logger // stats holds the Statistics for ProxyEngine - stats *Statistics + stats Statistics Status uint32 @@ -205,7 +205,10 @@ type Swamp struct { // After calling this you may use the various "setters" to change the options before calling ProxyEngine.Start(). func NewProxyEngine() *ProxyEngine { p5 := &ProxyEngine{ - stats: &Statistics{birthday: time.Now()}, + stats: Statistics{ + birthday: &atomic.Pointer[time.Time]{}, + accountingLastDone: &atomic.Pointer[time.Time]{}, + }, DebugLogger: &basicPrinter{}, opt: defOpt(), @@ -219,13 +222,20 @@ func NewProxyEngine() *ProxyEngine { Status: uint32(stateNew), } + tnow := time.Now() + p5.stats.birthday.Store(&tnow) + p5.stats.accountingLastDone.Store(&tnow) + p5.lastBadProxAnnnounced.Store("") p5.httpOptsDirty.Store(false) p5.httpClients = &sync.Pool{New: func() interface{} { return p5.newHTTPClient() }} - stats := []int64{p5.stats.Valid4, p5.stats.Valid4a, p5.stats.Valid5, p5.stats.ValidHTTP, p5.stats.Dispensed} - for i := range stats { - atomic.StoreInt64(&stats[i], 0) + stats := []**atomic.Int64{ + &p5.stats.Valid4, &p5.stats.Valid4a, &p5.stats.Valid5, &p5.stats.ValidHTTP, &p5.stats.Dispensed, + &p5.stats.Checked, &p5.stats.badAccounted, &p5.stats.Stale, + } + for _, i := range stats { + *i = &atomic.Int64{} } lists := []*proxyList{&p5.Valids.SOCKS5, &p5.Valids.SOCKS4, &p5.Valids.SOCKS4a, &p5.Valids.HTTP, &p5.Pending} diff --git a/example/main.go b/example/main.go index c331126..fd72e55 100644 --- a/example/main.go +++ b/example/main.go @@ -71,7 +71,7 @@ func currentString(lastMessage string) string { stats := swamp.GetStatistics() wMax, wRun, wIdle := swamp.GetWorkers() return fmt.Sprintf(statsFmt, - stats.GetUptime().Round(time.Second), int(stats.Valid4+stats.Valid4a+stats.Valid5), + stats.GetUptime().Round(time.Second), swamp.GetTotalValidated(), stats.Dispensed, wMax, wRun, wIdle, swamp.GetAutoScalerStateString(), lastMessage) } diff --git a/getters.go b/getters.go index 9f3fa9c..0ae475a 100644 --- a/getters.go +++ b/getters.go @@ -8,9 +8,8 @@ import ( "git.tcp.direct/kayos/common/entropy" ) -// GetStatistics returns all current Statistics. -// * This is a pointer, do not modify it! -func (p5 *ProxyEngine) GetStatistics() *Statistics { +// GetStatistics returns all Statistics atomics. +func (p5 *ProxyEngine) GetStatistics() Statistics { p5.mu.RLock() defer p5.mu.RUnlock() return p5.stats diff --git a/mystery_dialer.go b/mystery_dialer.go index 5d57470..988a83a 100644 --- a/mystery_dialer.go +++ b/mystery_dialer.go @@ -42,11 +42,11 @@ func (p5 *ProxyEngine) addTimeout(socksString string) string { } func (p5 *ProxyEngine) isEmpty() bool { - stats := p5.GetStatistics() - if stats.Checked == 0 { + if p5.GetStatistics().Checked.Load() == 0 { return true } - if stats.Valid5+stats.Valid4+stats.Valid4a+stats.ValidHTTP == 0 { + // if stats.Valid5.Load()+stats.Valid4.Load()+stats.Valid4a.Load()+stats.ValidHTTP.Load() == 0 { + if p5.GetTotalValidated() == 0 { return true } return false diff --git a/prox5_test.go b/prox5_test.go index f646302..ae1e840 100644 --- a/prox5_test.go +++ b/prox5_test.go @@ -19,11 +19,12 @@ import ( "git.tcp.direct/kayos/go-socks5" ) -func init() { - os.Setenv("PROX5_SCALER_DEBUG", "1") -} +var failures = &atomic.Int64{} -var failures int64 = 0 +func init() { + _ = os.Setenv("PROX5_SCALER_DEBUG", "1") + failures.Store(0) +} type randomFail struct { t *testing.T @@ -49,7 +50,7 @@ func (rf *randomFail) fail() bool { rf.t.Errorf("[FAIL] random SOCKS failure triggered too many times, total fail count: %d", rf.failedCount) } - atomic.AddInt64(&failures, 1) + failures.Add(1) return true } @@ -125,14 +126,14 @@ func (tl p5TestLogger) Errorf(format string, args ...interface{}) { func (tl p5TestLogger) Printf(format string, args ...interface{}) { val := fmt.Sprintf(format, args...) if strings.Contains(val, "failed to verify") { - atomic.AddInt64(&failures, 1) + failures.Add(1) } tl.t.Logf("[PRINT] " + val) } func (tl p5TestLogger) Print(args ...interface{}) { val := fmt.Sprintf("%+v", args...) if strings.Contains(val, "failed to verify") { - atomic.AddInt64(&failures, 1) + failures.Add(1) } tl.t.Log("[PRINT] " + val) } @@ -178,7 +179,7 @@ func TestProx5(t *testing.T) { } time.Sleep(time.Second * 1) got := p5.GetTotalValidated() - want := 55 - int(atomic.LoadInt64(&failures)) + want := 55 - failures.Load() if got != want { t.Logf("[WARN] total validated proxies does not match expected, got: %d, expected: %d", got, want) diff --git a/scale_util.go b/scale_util.go index 65fe2ee..7206738 100644 --- a/scale_util.go +++ b/scale_util.go @@ -21,15 +21,16 @@ func (p5 *ProxyEngine) scale() (sleep bool) { case <-p5.scaleTimer.C: bad := int64(0) totalBadNow := p5.GetTotalBad() - accountedFor := atomic.LoadInt64(&p5.stats.badAccounted) + accountedFor := p5.stats.badAccounted.Load() netFactors := totalBadNow - accountedFor - if time.Since(p5.stats.accountingLastDone) > 5*time.Second && netFactors > 0 { - bad = int64(netFactors) + if time.Since(*p5.stats.accountingLastDone.Load()) > 5*time.Second && netFactors > 0 { + bad = netFactors if p5.DebugEnabled() { p5.DebugLogger.Printf("accounting: %d bad - %d accounted for = %d net factors", totalBadNow, accountedFor, netFactors) } - p5.stats.accountingLastDone = time.Now() + tnow := time.Now() + p5.stats.accountingLastDone.Store(&tnow) } // this shouldn't happen..? if bad < 0 { @@ -39,8 +40,8 @@ func (p5 *ProxyEngine) scale() (sleep bool) { return } - totalValidated := int64(p5.GetTotalValidated()) - totalConsidered := int64(atomic.LoadInt64(&p5.GetStatistics().Dispensed)) + bad + totalValidated := p5.GetTotalValidated() + totalConsidered := p5.GetStatistics().Dispensed.Load() + bad // if we are considering more than we have validated, cap it at validated so that it registers properly. // additionally, signal the dialer to slow down a little. diff --git a/stats.go b/stats.go index 2688c12..47f50a3 100644 --- a/stats.go +++ b/stats.go @@ -8,60 +8,59 @@ import ( // Statistics is used to encapsulate various proxy engine stats type Statistics struct { // Valid4 is the amount of SOCKS4 proxies validated - Valid4 int64 + Valid4 *atomic.Int64 // Valid4a is the amount of SOCKS4a proxies validated - Valid4a int64 + Valid4a *atomic.Int64 // Valid5 is the amount of SOCKS5 proxies validated - Valid5 int64 + Valid5 *atomic.Int64 // ValidHTTP is the amount of HTTP proxies validated - ValidHTTP int64 + ValidHTTP *atomic.Int64 // Dispensed is a simple ticker to keep track of proxies dispensed via our getters - Dispensed int64 + Dispensed *atomic.Int64 // Stale is the amount of proxies that failed our stale policy upon dispensing - Stale int64 + Stale *atomic.Int64 // Checked is the amount of proxies we've checked. - Checked int64 + Checked *atomic.Int64 // birthday represents the time we started checking proxies with this pool - birthday time.Time + birthday *atomic.Pointer[time.Time] - badAccounted int64 - accountingLastDone time.Time + badAccounted *atomic.Int64 + accountingLastDone *atomic.Pointer[time.Time] } func (stats *Statistics) dispense() { - atomic.AddInt64(&stats.Dispensed, 1) + stats.Dispensed.Add(1) } func (stats *Statistics) stale() { - atomic.AddInt64(&stats.Stale, 1) + stats.Stale.Add(1) } func (stats *Statistics) v4() { - atomic.AddInt64(&stats.Valid4, 1) + stats.Valid4.Add(1) } func (stats *Statistics) v4a() { - atomic.AddInt64(&stats.Valid4a, 1) + stats.Valid4a.Add(1) } func (stats *Statistics) v5() { - atomic.AddInt64(&stats.Valid5, 1) + stats.Valid5.Add(1) } func (stats *Statistics) http() { - atomic.AddInt64(&stats.ValidHTTP, 1) + stats.ValidHTTP.Add(1) } // GetTotalValidated retrieves our grand total validated proxy count. -func (p5 *ProxyEngine) GetTotalValidated() int { +func (p5 *ProxyEngine) GetTotalValidated() int64 { stats := p5.GetStatistics() total := int64(0) - for _, val := range []*int64{&stats.Valid4a, &stats.Valid4, &stats.Valid5, &stats.ValidHTTP} { - atomic.AddInt64(&total, atomic.LoadInt64(val)) + for _, val := range []*atomic.Int64{stats.Valid4a, stats.Valid4, stats.Valid5, stats.ValidHTTP} { + total += val.Load() } - - return int(total) + return total } func (p5 *ProxyEngine) GetTotalBad() int64 { @@ -71,5 +70,5 @@ func (p5 *ProxyEngine) GetTotalBad() int64 { // GetUptime returns the total lifetime duration of our pool. func (stats *Statistics) GetUptime() time.Duration { - return time.Since(stats.birthday) + return time.Since(*stats.birthday.Load()) } diff --git a/validator_engine.go b/validator_engine.go index ef156a4..4bfebea 100644 --- a/validator_engine.go +++ b/validator_engine.go @@ -134,7 +134,7 @@ func (p5 *ProxyEngine) validate(hmd *handMeDown) (string, error) { } func (p5 *ProxyEngine) anothaOne() { - atomic.AddInt64(&p5.stats.Checked, 1) + p5.stats.Checked.Add(1) } type handMeDown struct {