Scaling adjustments
This commit is contained in:
parent
d0687b4f99
commit
2a50ff7186
14
daemons.go
14
daemons.go
|
@ -3,6 +3,7 @@ package prox5
|
|||
import (
|
||||
"errors"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
@ -45,11 +46,19 @@ func (p5 *ProxyEngine) recycling() int {
|
|||
case !p5.GetRecyclingStatus(), p5.proxyMap.plot.Count() < 1:
|
||||
return 0
|
||||
default:
|
||||
select {
|
||||
case <-p5.recyleTimer.C:
|
||||
break
|
||||
default:
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
var count = 0
|
||||
var printedFull = false
|
||||
|
||||
var o = &sync.Once{}
|
||||
|
||||
for tuple := range p5.proxyMap.plot.IterBuffered() {
|
||||
select {
|
||||
case <-p5.ctx.Done():
|
||||
|
@ -57,9 +66,12 @@ func (p5 *ProxyEngine) recycling() int {
|
|||
case p5.Pending <- tuple.Val:
|
||||
count++
|
||||
default:
|
||||
atomic.AddInt64(&p5.stats.timesChannelFull, 1)
|
||||
o.Do(func() {
|
||||
atomic.AddInt64(&p5.stats.timesChannelFull, 1)
|
||||
})
|
||||
if !printedFull {
|
||||
if time.Since(p5.stats.lastWarnedChannelFull) > 5*time.Second {
|
||||
p5.scale()
|
||||
p5.DebugLogger.Print("can't recycle, channel full")
|
||||
printedFull = true
|
||||
p5.stats.lastWarnedChannelFull = time.Now()
|
||||
|
|
5
defs.go
5
defs.go
|
@ -60,6 +60,8 @@ type ProxyEngine struct {
|
|||
scaler *scaler.AutoScaler
|
||||
scaleTimer *time.Ticker
|
||||
|
||||
recyleTimer *time.Ticker
|
||||
|
||||
opt *config
|
||||
runningdaemons int32
|
||||
conductor chan bool
|
||||
|
@ -200,7 +202,8 @@ func NewProxyEngine() *ProxyEngine {
|
|||
}))
|
||||
|
||||
pe.scaler = scaler.NewAutoScaler(pe.opt.maxWorkers, pe.opt.maxWorkers+100, 50)
|
||||
pe.scaleTimer = time.NewTicker(200 * time.Millisecond)
|
||||
pe.scaleTimer = time.NewTicker(750 * time.Millisecond)
|
||||
pe.recyleTimer = time.NewTicker(100 * time.Millisecond)
|
||||
|
||||
if err != nil {
|
||||
buf := strs.Get()
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package scaler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
||||
|
@ -24,6 +25,14 @@ func debug(msg string) {
|
|||
}
|
||||
}
|
||||
|
||||
func noopMsg(validated, dispensed int64, as *AutoScaler) string {
|
||||
if !debugSwitch {
|
||||
return ""
|
||||
}
|
||||
return fmt.Sprintf("noop: validated: %d, dispensed: %d, mod: %d, max: %d, threshold: %d",
|
||||
validated, dispensed, atomic.LoadInt64(as.mod), atomic.LoadInt64(as.Max), atomic.LoadInt64(as.Threshold))
|
||||
}
|
||||
|
||||
const (
|
||||
stateDisabled autoScalerState = iota
|
||||
stateIdle
|
||||
|
@ -94,6 +103,11 @@ func (as *AutoScaler) SetBaseline(baseline int) {
|
|||
|
||||
// ScaleAnts scales the pool, it returns true if the pool scale has been changed, and false if not.
|
||||
func (as *AutoScaler) ScaleAnts(pool *ants.Pool, validated int64, dispensed int64) bool {
|
||||
if dispensed > validated {
|
||||
// consider panicing here...
|
||||
debug("dispensed > validated (FUBAR)")
|
||||
dispensed = validated
|
||||
}
|
||||
if atomic.LoadInt64(as.mod) < 0 {
|
||||
panic("scaler.go: scaler mod is negative")
|
||||
}
|
||||
|
@ -138,7 +152,7 @@ func (as *AutoScaler) ScaleAnts(pool *ants.Pool, validated int64, dispensed int6
|
|||
|
||||
switch {
|
||||
case noop:
|
||||
debug("noop")
|
||||
debug(noopMsg(validated, dispensed, as))
|
||||
return false
|
||||
case ((!needScaleUp && !needScaleDown) || atomic.LoadInt64(as.mod) == 0) && !idle:
|
||||
debug("not scaling up or down or mod is 0, and not idle, setting idle")
|
||||
|
|
1
proxy.go
1
proxy.go
|
@ -27,6 +27,7 @@ type Proxy struct {
|
|||
// Endpoint is the address:port of the proxy that we connect to
|
||||
Endpoint string
|
||||
// ProxiedIP is the address that we end up having when making proxied requests through this proxy
|
||||
// TODO: parse this and store as flat int type
|
||||
ProxiedIP string
|
||||
// protocol is the version/Protocol (currently SOCKS* only) of the proxy
|
||||
protocol proto
|
||||
|
|
|
@ -30,8 +30,6 @@ func (p5 *ProxyEngine) scale() (sleep bool) {
|
|||
p5.DebugLogger.Printf("accounting: %d bad, %d full, %d accounted, %d net factors",
|
||||
totalBadNow, totalChanFullNow, accountedFor, netFactors)
|
||||
}
|
||||
atomic.AddInt64(&p5.stats.badAccounted, 1)
|
||||
atomic.AddInt64(&p5.stats.fullChanAccounted, 1)
|
||||
p5.stats.accountingLastDone = time.Now()
|
||||
}
|
||||
// this shouldn't happen..?
|
||||
|
@ -47,9 +45,9 @@ func (p5 *ProxyEngine) scale() (sleep bool) {
|
|||
|
||||
// if we are considering more than we have validated, cap it at validated so that it registers properly.
|
||||
// additionally, signal the dialer to slow down a little.
|
||||
if totalConsidered > totalValidated {
|
||||
if totalConsidered >= totalValidated {
|
||||
sleep = true
|
||||
totalConsidered = totalValidated
|
||||
totalConsidered = totalValidated - atomic.LoadInt64(p5.scaler.Threshold)/2
|
||||
}
|
||||
|
||||
if p5.scaler.ScaleAnts(
|
||||
|
@ -57,6 +55,7 @@ func (p5 *ProxyEngine) scale() (sleep bool) {
|
|||
totalValidated,
|
||||
totalConsidered,
|
||||
) {
|
||||
atomic.AddInt64(&p5.stats.fullChanAccounted, 1)
|
||||
p5.scaleDbg()
|
||||
}
|
||||
default:
|
||||
|
|
Loading…
Reference in New Issue