Complete teardown (broken atm)

This commit is contained in:
kayos@tcp.direct 2023-01-31 01:21:29 -08:00
parent 805740869c
commit 7ade879676
Signed by: kayos
GPG Key ID: 4B841471B4BEE979
16 changed files with 366 additions and 240 deletions

View File

@ -10,17 +10,18 @@ import (
type engineState uint32
const (
// StateRunning means the proxy pool is currently taking in proxys and validating them, and is available to dispense proxies.
StateRunning engineState = iota
// StatePaused means the proxy pool has been with ProxyEngine.Pause() and may be resumed with ProxyEngine.Resume()
StatePaused
// StateNew means the proxy pool has never been started.
StateNew
// stateRunning means the proxy pool is currently taking in proxys and validating them, and is available to dispense proxies.
stateRunning engineState = iota
// statePaused means the proxy pool has been with ProxyEngine.Pause() and may be resumed with ProxyEngine.Resume()
statePaused
// stateNew means the proxy pool has never been started.
stateNew
)
// Start starts our proxy pool operations. Trying to start a running ProxyEngine will return an error.
func (p5 *ProxyEngine) Start() error {
if atomic.LoadUint32(&p5.Status) != uint32(StateNew) {
if atomic.LoadUint32(&p5.Status) != uint32(stateNew) {
p5.dbgPrint(simpleString("proxy pool has been started before, resuming instead"))
return p5.Resume()
}
p5.startDaemons()
@ -43,13 +44,13 @@ func (p5 *ProxyEngine) Pause() error {
p5.quit()
atomic.StoreUint32(&p5.Status, uint32(StatePaused))
atomic.StoreUint32(&p5.Status, uint32(statePaused))
return nil
}
func (p5 *ProxyEngine) startDaemons() {
go p5.jobSpawner()
atomic.StoreUint32(&p5.Status, uint32(StateRunning))
atomic.StoreUint32(&p5.Status, uint32(stateRunning))
}
// Resume will resume pause proxy pool operations, attempting to resume a running ProxyEngine is returns an error.
@ -61,3 +62,9 @@ func (p5 *ProxyEngine) Resume() error {
p5.startDaemons()
return nil
}
// CloseAllConns will close all connections in progress by the dialers (including the SOCKS server if in use).
// Note this does not effect the proxy pool, it will continue to operate as normal.
func (p5 *ProxyEngine) CloseAllConns() {
p5.killConns()
}

View File

@ -3,10 +3,9 @@ package prox5
import (
"errors"
"strconv"
"sync"
"sync/atomic"
"time"
"git.tcp.direct/kayos/common/entropy"
cmap "github.com/orcaman/concurrent-map/v2"
)
@ -42,12 +41,17 @@ func (sm proxyMap) clear() {
}
func (p5 *ProxyEngine) recycling() int {
if !p5.recycleMu.TryLock() {
return 0
}
defer p5.recycleMu.Unlock()
switch {
case !p5.GetRecyclingStatus(), p5.proxyMap.plot.Count() < 1:
return 0
default:
select {
case <-p5.recyleTimer.C:
case <-p5.recycleTimer.C:
break
default:
return 0
@ -55,43 +59,49 @@ func (p5 *ProxyEngine) recycling() int {
}
var count = 0
var printedFull = false
var o = &sync.Once{}
tpls := make(chan cmap.Tuple[string, *Proxy], p5.proxyMap.plot.Count())
for tuple := range p5.proxyMap.plot.IterBuffered() {
select {
case <-p5.ctx.Done():
return 0
case p5.Pending <- tuple.Val:
count++
default:
o.Do(func() {
atomic.AddInt64(&p5.stats.timesChannelFull, 1)
})
if !printedFull {
if time.Since(p5.stats.lastWarnedChannelFull) > 20*time.Second {
p5.scale()
p5.DebugLogger.Print("can't recycle, channel full")
printedFull = true
p5.stats.lastWarnedChannelFull = time.Now()
}
recycleTuples := func(items chan cmap.Tuple[string, *Proxy]) {
for tuple := range items {
select {
case <-p5.ctx.Done():
return
default:
//
}
time.Sleep(1 * time.Second)
continue
p5.Pending.add(tuple.Val)
count++
}
}
switch p5.GetRecyclerShuffleStatus() {
case true:
var tuples []cmap.Tuple[string, *Proxy]
for tuple := range p5.proxyMap.plot.IterBuffered() {
tuples = append(tuples, tuple)
}
entropy.GetOptimizedRand().Shuffle(len(tuples), func(i, j int) {
tuples[i], tuples[j] = tuples[j], tuples[i]
})
for _, tuple := range tuples {
tpls <- tuple
}
case false:
for tuple := range p5.proxyMap.plot.IterBuffered() {
tpls <- tuple
}
}
recycleTuples(tpls)
return count
}
func (p5 *ProxyEngine) jobSpawner() {
if p5.pool.IsClosed() {
p5.pool.Reboot()
}
p5.pool.Reboot()
p5.dbgPrint(simpleString("job spawner started"))
defer p5.dbgPrint(simpleString("job spawner paused"))
q := make(chan bool)
@ -101,13 +111,12 @@ func (p5 *ProxyEngine) jobSpawner() {
case <-p5.ctx.Done():
q <- true
return
case sock := <-p5.Pending:
_ = p5.scale()
if err := p5.pool.Submit(sock.validate); err != nil {
p5.dbgPrint(simpleString(err.Error()))
}
default:
}
p5.Pending.RLock()
if p5.Pending.Len() < 1 {
p5.Pending.RUnlock()
count := p5.recycling()
if count > 0 {
buf := strs.Get()
@ -116,10 +125,24 @@ func (p5 *ProxyEngine) jobSpawner() {
buf.MustWriteString(" proxies from our map")
p5.dbgPrint(buf)
}
time.Sleep(time.Millisecond * 500)
continue
}
p5.Pending.RUnlock()
p5.Pending.Lock()
sock := p5.Pending.Remove(p5.Pending.Front()).(*Proxy)
p5.Pending.Unlock()
_ = p5.scale()
if err := p5.pool.Submit(sock.validate); err != nil {
p5.dbgPrint(simpleString(err.Error()))
}
}
}()
<-q
p5.dbgPrint(simpleString("job spawner paused"))
p5.pool.Release()
}

85
defs.go
View File

@ -1,11 +1,13 @@
package prox5
import (
"container/list"
"context"
"sync"
"sync/atomic"
"time"
"git.tcp.direct/kayos/common/entropy"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/panjf2000/ants/v2"
rl "github.com/yunginnanet/Rate5"
@ -14,16 +16,47 @@ import (
"git.tcp.direct/kayos/prox5/logger"
)
type proxyList struct {
*list.List
*sync.RWMutex
}
func (pl proxyList) add(p *Proxy) {
pl.Lock()
defer pl.Unlock()
pl.PushBack(p)
}
func (pl proxyList) pop() *Proxy {
pl.Lock()
if pl.Len() < 1 {
pl.Unlock()
return nil
}
p := pl.Remove(pl.Front()).(*Proxy)
pl.Unlock()
return p
}
// ProxyChannels will likely be unexported in the future.
type ProxyChannels struct {
// SOCKS5 is a constant stream of verified SOCKS5 proxies
SOCKS5 chan *Proxy
SOCKS5 proxyList
// SOCKS4 is a constant stream of verified SOCKS4 proxies
SOCKS4 chan *Proxy
SOCKS4 proxyList
// SOCKS4a is a constant stream of verified SOCKS5 proxies
SOCKS4a chan *Proxy
SOCKS4a proxyList
// HTTP is a constant stream of verified SOCKS5 proxies
HTTP chan *Proxy
HTTP proxyList
}
// Slice returns a slice of all proxyLists in ProxyChannels, note that HTTP is not included.
func (pc ProxyChannels) Slice() []*proxyList {
lists := []*proxyList{&pc.SOCKS5, &pc.SOCKS4, &pc.SOCKS4a}
entropy.GetOptimizedRand().Shuffle(len(pc.Slice()), func(i, j int) {
lists[i], lists[j] = lists[j], lists[i]
})
return lists
}
// ProxyEngine represents a proxy pool
@ -37,7 +70,7 @@ type ProxyEngine struct {
Status uint32
// Pending is a constant stream of proxy strings to be verified
Pending chan *Proxy
Pending proxyList
// see: https://pkg.go.dev/github.com/yunginnanet/Rate5
useProx *rl.Limiter
@ -50,17 +83,21 @@ type ProxyEngine struct {
ctx context.Context
quit context.CancelFunc
httpOptsDirty *atomic.Bool
httpClients *sync.Pool
proxyMap proxyMap
// reaper sync.Pool
mu *sync.RWMutex
pool *ants.Pool
recycleMu *sync.Mutex
mu *sync.RWMutex
pool *ants.Pool
scaler *scaler.AutoScaler
scaleTimer *time.Ticker
recyleTimer *time.Ticker
recycleTimer *time.Ticker
opt *config
runningdaemons int32
@ -101,6 +138,8 @@ func defOpt() *config {
stale: defaultStaleTime,
maxWorkers: defaultWorkerCount,
redact: false,
tlsVerify: false,
shuffle: true,
}
sm.validationTimeout = time.Duration(18) * time.Second
sm.serverTimeout = time.Duration(180) * time.Second
@ -134,8 +173,10 @@ type config struct {
recycle bool
// remove proxy from recycling after being marked bad this many times
removeafter int
// shuffle determines whether or not we shuffle proxies before we validate and dispense them.
// shuffle bool
// shuffle determines whether or not we shuffle proxies when we recycle them.
shuffle bool
// tlsVerify determines whether or not we verify the TLS certificate of the endpoints the http client connects to.
tlsVerify bool
// TODO: make getters and setters for these
useProxConfig rl.Policy
@ -167,19 +208,27 @@ func NewProxyEngine() *ProxyEngine {
opt: defOpt(),
conductor: make(chan bool),
mu: &sync.RWMutex{},
Status: uint32(StateNew),
conductor: make(chan bool),
mu: &sync.RWMutex{},
recycleMu: &sync.Mutex{},
httpOptsDirty: &atomic.Bool{},
Status: uint32(stateNew),
}
pe.httpOptsDirty.Store(false)
pe.httpClients = &sync.Pool{New: func() interface{} { return pe.newHTTPClient() }}
stats := []int64{pe.stats.Valid4, pe.stats.Valid4a, pe.stats.Valid5, pe.stats.ValidHTTP, pe.stats.Dispensed}
for i := range stats {
atomic.StoreInt64(&stats[i], 0)
}
chans := []*chan *Proxy{&pe.Valids.SOCKS5, &pe.Valids.SOCKS4, &pe.Valids.SOCKS4a, &pe.Valids.HTTP, &pe.Pending}
for _, c := range chans {
*c = make(chan *Proxy, 100000)
lists := []*proxyList{&pe.Valids.SOCKS5, &pe.Valids.SOCKS4, &pe.Valids.SOCKS4a, &pe.Valids.HTTP, &pe.Pending}
for _, c := range lists {
*c = proxyList{
List: &list.List{},
RWMutex: &sync.RWMutex{},
}
}
pe.dispenseMiddleware = func(p *Proxy) (*Proxy, bool) {
@ -189,7 +238,7 @@ func NewProxyEngine() *ProxyEngine {
pe.conCtx, pe.killConns = context.WithCancel(context.Background())
pe.proxyMap = newProxyMap(pe)
atomic.StoreUint32(&pe.Status, uint32(StateNew))
atomic.StoreUint32(&pe.Status, uint32(stateNew))
atomic.StoreInt32(&pe.runningdaemons, 0)
pe.useProx = rl.NewCustomLimiter(pe.opt.useProxConfig)
@ -203,7 +252,7 @@ func NewProxyEngine() *ProxyEngine {
pe.scaler = scaler.NewAutoScaler(pe.opt.maxWorkers, pe.opt.maxWorkers+100, 50)
pe.scaleTimer = time.NewTicker(750 * time.Millisecond)
pe.recyleTimer = time.NewTicker(100 * time.Millisecond)
pe.recycleTimer = time.NewTicker(100 * time.Millisecond)
if err != nil {
buf := strs.Get()

View File

@ -5,90 +5,96 @@ import (
"time"
)
func (p5 *ProxyEngine) getSocksStr(proto ProxyProtocol) string {
var sock *Proxy
var list *proxyList
switch proto {
case ProtoSOCKS4:
list = &p5.Valids.SOCKS4
case ProtoSOCKS4a:
list = &p5.Valids.SOCKS4a
case ProtoSOCKS5:
list = &p5.Valids.SOCKS5
case ProtoHTTP:
list = &p5.Valids.HTTP
}
for {
if list.Len() == 0 {
p5.recycling()
time.Sleep(250 * time.Millisecond)
continue
}
list.Lock()
sock = list.Remove(list.Front()).(*Proxy)
list.Unlock()
switch {
case sock == nil:
p5.recycling()
time.Sleep(250 * time.Millisecond)
continue
case !p5.stillGood(sock):
sock = nil
continue
default:
p5.stats.dispense()
return sock.Endpoint
}
}
}
// Socks5Str gets a SOCKS5 proxy that we have fully verified (dialed and then retrieved our IP address from a what-is-my-ip endpoint.
// Will block if one is not available!
func (p5 *ProxyEngine) Socks5Str() string {
for {
select {
case sock := <-p5.Valids.SOCKS5:
if !p5.stillGood(sock) {
continue
}
p5.stats.dispense()
return sock.Endpoint
default:
p5.recycling()
}
}
return p5.getSocksStr(ProtoSOCKS5)
}
// Socks4Str gets a SOCKS4 proxy that we have fully verified.
// Will block if one is not available!
func (p5 *ProxyEngine) Socks4Str() string {
defer p5.stats.dispense()
for {
select {
case sock := <-p5.Valids.SOCKS4:
if !p5.stillGood(sock) {
continue
}
return sock.Endpoint
default:
p5.recycling()
}
}
return p5.getSocksStr(ProtoSOCKS4)
}
// Socks4aStr gets a SOCKS4 proxy that we have fully verified.
// Will block if one is not available!
func (p5 *ProxyEngine) Socks4aStr() string {
defer p5.stats.dispense()
for {
select {
case sock := <-p5.Valids.SOCKS4a:
if !p5.stillGood(sock) {
continue
}
return sock.Endpoint
default:
p5.recycling()
}
}
return p5.getSocksStr(ProtoSOCKS4a)
}
// GetHTTPTunnel checks for an available HTTP CONNECT proxy in our pool.
// For now, this function does not loop forever like the GetAnySOCKS does.
// Alternatively it can be included within the for loop by passing true to GetAnySOCKS.
// If there is an HTTP proxy available, ok will be true. If not, it will return false without delay.
func (p5 *ProxyEngine) GetHTTPTunnel() (p *Proxy, ok bool) {
select {
case httptunnel := <-p5.Valids.HTTP:
return httptunnel, true
default:
return nil, false
}
func (p5 *ProxyEngine) GetHTTPTunnel() string {
return p5.getSocksStr(ProtoHTTP)
}
// GetAnySOCKS retrieves any version SOCKS proxy as a Proxy type
// Will block if one is not available!
func (p5 *ProxyEngine) GetAnySOCKS() *Proxy {
var sock *Proxy
defer p5.stats.dispense()
for {
var sock *Proxy
select {
case sock = <-p5.Valids.SOCKS4:
break
case sock = <-p5.Valids.SOCKS4a:
break
case sock = <-p5.Valids.SOCKS5:
break
case <-p5.ctx.Done():
return nil
default:
time.Sleep(250 * time.Millisecond)
p5.recycling()
//
}
if p5.stillGood(sock) {
return sock
for _, list := range p5.Valids.Slice() {
list.RLock()
if list.Len() > 0 {
list.RUnlock()
sock = list.pop()
switch {
case sock == nil:
p5.recycling()
time.Sleep(50 * time.Millisecond)
case p5.stillGood(sock):
return sock
default:
sock = nil
}
continue
}
list.RUnlock()
}
}
}
@ -125,7 +131,7 @@ func (p5 *ProxyEngine) stillGood(sock *Proxy) bool {
buf.MustWriteString("proxy stale: ")
buf.MustWriteString(sock.Endpoint)
p5.dbgPrint(buf)
go p5.stats.stale()
p5.stats.stale()
return false
}

View File

@ -48,7 +48,7 @@ func init() {
socklog.Printf("[USAGE] q: quit | d: debug | p: pause/unpause")
}
const statsFmt = ">>>>>-----<<<<<\n>>>>>Prox5<<<<<\n>>>>>-----<<<<<\n\nUptime: %s\n\nValidated: %d\nDispensed: %d\n\nMaximum Workers: %d\nActive Workers: %d\nAsleep Workers: %d\n\nAutoScale: %s\n\n----------\n%s"
const statsFmt = ">>>>>-----<<<<<\n>>>>>Prox5<<<<<\n>>>>>-----<<<<<\n\nUptime: %s\n\nValidated: %d\nDispensed: %d\n\nMaximum Workers: %d\nActive Workers: %d\nAsleep Workers: %d\n\nAutoScale: %s\nSOCKS5 listening on 127.0.0.1:1555\n\n----------\n%s"
var (
background *tview.TextView
@ -102,29 +102,27 @@ func (s socksLogger) Print(str string) {
}
func buttons(buttonIndex int, buttonLabel string) {
go func() {
switch buttonIndex {
case 0:
app.Stop()
case 1:
if swamp.IsRunning() {
err := swamp.Pause()
if err != nil {
socklog.Printf(err.Error())
}
} else {
if err := swamp.Resume(); err != nil {
socklog.Printf(err.Error())
}
switch buttonIndex {
case 0:
app.Stop()
case 1:
if swamp.IsRunning() {
err := swamp.Pause()
if err != nil {
socklog.Printf(err.Error())
}
} else {
if err := swamp.Resume(); err != nil {
socklog.Printf(err.Error())
}
case 2:
swamp.SetMaxWorkers(swamp.GetMaxWorkers() + 1)
case 3:
swamp.SetMaxWorkers(swamp.GetMaxWorkers() - 1)
default:
app.Stop()
}
}()
case 2:
swamp.SetMaxWorkers(swamp.GetMaxWorkers() + 1)
case 3:
swamp.SetMaxWorkers(swamp.GetMaxWorkers() - 1)
default:
app.Stop()
}
}
func main() {
@ -132,7 +130,7 @@ func main() {
go func() {
for {
time.Sleep(250 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
app.QueueUpdateDraw(func() {
window.SetText(currentString(""))
})

View File

@ -121,13 +121,11 @@ func (p5 *ProxyEngine) GetDispenseMiddleware() func(*Proxy) (*Proxy, bool) {
return p5.dispenseMiddleware
}
// TODO: List shuffling
/*func (p5 *ProxyEngine) GetShuffleStatus() bool {
func (p5 *ProxyEngine) GetRecyclerShuffleStatus() bool {
p5.mu.RLock()
defer p5.mu.RUnlock()
return p5.opt.shuffle
}*/
}
func (p5 *ProxyEngine) GetAutoScalerStatus() bool {
return p5.scaler.IsOn()
@ -142,3 +140,9 @@ func (p5 *ProxyEngine) GetDebugRedactStatus() bool {
defer p5.mu.RUnlock()
return p5.opt.redact
}
func (p5 *ProxyEngine) GetHTTPTLSVerificationStatus() bool {
p5.mu.RLock()
defer p5.mu.RUnlock()
return p5.opt.tlsVerify
}

18
go.mod
View File

@ -4,7 +4,7 @@ go 1.19
require (
git.tcp.direct/kayos/common v0.8.0
git.tcp.direct/kayos/go-socks5 v0.3.0
git.tcp.direct/kayos/go-socks5 v1.0.4
git.tcp.direct/kayos/socks v0.1.1
github.com/gdamore/tcell/v2 v2.5.4
github.com/miekg/dns v1.1.50
@ -12,7 +12,7 @@ require (
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/panjf2000/ants/v2 v2.7.1
github.com/refraction-networking/utls v1.2.0
github.com/rivo/tview v0.0.0-20230104153304-892d1a2eb0da
github.com/rivo/tview v0.0.0-20230130130022-4a1b7a76c01c
github.com/yunginnanet/Rate5 v1.2.1
golang.org/x/net v0.5.0
inet.af/netaddr v0.0.0-20220811202034-502d2d690317
@ -21,24 +21,24 @@ require (
require (
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/gdamore/encoding v1.0.0 // indirect
github.com/klauspost/compress v1.15.15 // indirect
github.com/klauspost/compress v1.15.12 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/rivo/uniseg v0.4.3 // indirect
go4.org/intern v0.0.0-20220617035311-6925f38cc365 // indirect
github.com/rivo/uniseg v0.4.2 // indirect
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/term v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
golang.org/x/tools v0.5.0 // indirect
golang.org/x/tools v0.1.12 // indirect
nullprogram.com/x/rng v1.1.0 // indirect
)
retract (
v0.8.4
v1.2.2 // cleanup
v1.2.1 // accident
v0.8.4
)

29
go.sum
View File

@ -1,9 +1,5 @@
git.tcp.direct/kayos/common v0.8.0 h1:7Nl44HAKQU5jvHb2eJgU5cwbOLfft3P/XTxZHTMPfyo=
git.tcp.direct/kayos/common v0.8.0/go.mod h1:r7lZuKTQz0uf/jNm61sz1XaMgK/RYRr7wtqr/cNYd8o=
git.tcp.direct/kayos/go-socks5 v0.3.0 h1:nCsYM0ttPZHGAVVG8zFEy2ZTxoSyPp5ld1YSy3zyWDQ=
git.tcp.direct/kayos/go-socks5 v0.3.0/go.mod h1:6Dw8lhiA+dqCY6CvPBtMyXMug9+spSAXC7+lrqItXf0=
git.tcp.direct/kayos/go-socks5 v1.0.3 h1:oXz0Uv9OYt9b2N/mM0uarU+320ZDX5Aa3S7r2s522es=
git.tcp.direct/kayos/go-socks5 v1.0.3/go.mod h1:OUpGQQzCqJLMzc008+f2BY4bWGgxUKDUOV/chWXMUBQ=
git.tcp.direct/kayos/go-socks5 v1.0.4 h1:2x2H0TnKfOc7sat12WNF+uqZLKvPCoKkWdxPfJXUKJM=
git.tcp.direct/kayos/go-socks5 v1.0.4/go.mod h1:smPlQaSGcQR+Kf6wfJkI2nskxWPgtxGIh1beMD+cwB8=
git.tcp.direct/kayos/socks v0.1.1 h1:2xcHVHA2u4sSEPq+wIV6qmb/fgxEfFHdRAh5hTPL0NM=
@ -19,8 +15,8 @@ github.com/gdamore/encoding v1.0.0/go.mod h1:alR0ol34c49FCSBLjhosxzcPHQbf2trDkoo
github.com/gdamore/tcell/v2 v2.5.4 h1:TGU4tSjD3sCL788vFNeJnTdzpNKIw1H5dgLnJRQVv/k=
github.com/gdamore/tcell/v2 v2.5.4/go.mod h1:dZgRy5v4iMobMEcWNYBtREnDZAT9DYmfqIkrgEMxLyw=
github.com/h12w/go-socks5 v0.0.0-20200522160539-76189e178364 h1:5XxdakFhqd9dnXoAZy1Mb2R/DZ6D1e+0bGC/JhucGYI=
github.com/klauspost/compress v1.15.15 h1:EF27CXIuDsYJ6mmvtBRlEuB2UVOqHG1tAXgZ7yIO+lw=
github.com/klauspost/compress v1.15.15/go.mod h1:ZcK2JAFqKOpnBlxcLsJzYfrS9X1akm9fHZNnD9+Vo/4=
github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM=
github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
@ -40,11 +36,11 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/refraction-networking/utls v1.2.0 h1:U5f8wkij2NVinfLuJdFP3gCMwIHs+EzvhxmYdXgiapo=
github.com/refraction-networking/utls v1.2.0/go.mod h1:NPq+cVqzH7D1BeOkmOcb5O/8iVewAsiVt2x1/eO0hgQ=
github.com/rivo/tview v0.0.0-20230104153304-892d1a2eb0da h1:3Mh+tcC2KqetuHpWMurDeF+yOgyt4w4qtLIpwSQ3uqo=
github.com/rivo/tview v0.0.0-20230104153304-892d1a2eb0da/go.mod h1:lBUy/T5kyMudFzWUH/C2moN+NlU5qF505vzOyINXuUQ=
github.com/rivo/tview v0.0.0-20230130130022-4a1b7a76c01c h1:zIYU4PjQJ4BnYryMmpyizt1Un13V0ToCMXvC05DK8xc=
github.com/rivo/tview v0.0.0-20230130130022-4a1b7a76c01c/go.mod h1:lBUy/T5kyMudFzWUH/C2moN+NlU5qF505vzOyINXuUQ=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.3 h1:utMvzDsuh3suAEnhH0RdHmoPbU648o6CvXxTx4SBMOw=
github.com/rivo/uniseg v0.4.3/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rivo/uniseg v0.4.2 h1:YwD0ulJSJytLpiaWua0sBDusfsCZohxjxzVTYjwxfV8=
github.com/rivo/uniseg v0.4.2/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
@ -57,9 +53,8 @@ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yunginnanet/Rate5 v1.2.1 h1:6G8j0RLZofS76l6cMvqa8v6yeGbn2yPNk0xqH6EXcnU=
github.com/yunginnanet/Rate5 v1.2.1/go.mod h1:f0r66kVQZojRqUgVdLC/CKexMlF0nUDAmd01tBeF4Ms=
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 h1:UXLjNohABv4S58tHmeuIZDO6e3mHpW2Dx33gaNt03LE=
go4.org/intern v0.0.0-20211027215823-ae77deb06f29/go.mod h1:cS2ma+47FKrLPdXFpr7CuxiTW3eyJbWew4qx0qtQWDA=
go4.org/intern v0.0.0-20220617035311-6925f38cc365 h1:t9hFvR102YlOqU0fQn1wgwhNvSbHGBbbJxX9JKfU3l0=
go4.org/intern v0.0.0-20220617035311-6925f38cc365/go.mod h1:WXRv3p7T6gzt0CcJm43AAKdKVZmcQbwwC7EwquU5BZU=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20211027215541-db492cf91b37/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 h1:FyBZqvoA/jbNzuAWLQE2kG820zMAkcilx6BMjGbL/E4=
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760/go.mod h1:FftLjUGFEDu5k8lt0ddY+HcrH/qU/0qk+H8j9/nTl3E=
@ -67,13 +62,12 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.5.0 h1:U/0M97KRkSFvyD/3FSmdP5W5swImpNgle/EHFhOsQPE=
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA=
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
@ -118,9 +112,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.6-0.20210726203631-07bc1bf47fb2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.5.0 h1:+bSpV5HIeWkuvgaMfI3UmKRThoTA5ODJTUd8T17NO+4=
golang.org/x/tools v0.5.0/go.mod h1:N+Kgy78s5I24c24dU8OfWNEotWjutIs8SnJvn5IDq+k=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -59,9 +59,11 @@ func (p5 *ProxyEngine) LoadSingleProxy(sock string) bool {
}
func (p5 *ProxyEngine) loadSingleProxy(sock string) error {
if _, ok := p5.proxyMap.add(sock); !ok {
p, ok := p5.proxyMap.add(sock)
if !ok {
return errors.New("proxy already exists")
}
p5.Pending.add(p)
return nil
}

View File

@ -3,25 +3,45 @@ package prox5
import (
"crypto/tls"
"net/http"
"sync"
)
// GetHTTPClient retrieves a pointer to an http.Client powered by mysteryDialer.
func (p5 *ProxyEngine) GetHTTPClient() *http.Client {
func (p5 *ProxyEngine) newHTTPClient() any {
return &http.Client{
Transport: &http.Transport{
DialContext: p5.DialContext,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
TLSClientConfig: &tls.Config{InsecureSkipVerify: p5.GetHTTPTLSVerificationStatus()},
// TLSHandshakeTimeout: p5.GetServerTimeout(),
DisableKeepAlives: true,
DisableCompression: false,
MaxIdleConnsPerHost: 5,
// IdleConnTimeout: p5.GetServerTimeout(),
// ResponseHeaderTimeout: p5.GetServerTimeout(),
DisableKeepAlives: true,
DisableCompression: false,
// MaxIdleConnsPerHost: 5,
MaxConnsPerHost: 0,
IdleConnTimeout: 0,
ResponseHeaderTimeout: 0,
ExpectContinueTimeout: 0,
TLSNextProto: nil,
ProxyConnectHeader: nil,
GetProxyConnectHeader: nil,
MaxResponseHeaderBytes: 0,
WriteBufferSize: 0,
ReadBufferSize: 0,
ForceAttemptHTTP2: false,
},
Timeout: p5.GetServerTimeout(),
}
}
// GetHTTPClient retrieves a pointer to an http.Client powered by mysteryDialer.
func (p5 *ProxyEngine) GetHTTPClient() *http.Client {
if p5.httpOptsDirty.Load() {
p5.httpClients = &sync.Pool{
New: p5.newHTTPClient,
}
p5.httpOptsDirty.Store(false)
}
return p5.httpClients.Get().(*http.Client)
}
// RoundTrip is Mr. WorldWide. Obviously. See: https://pkg.go.dev/net/http#RoundTripper
func (p5 *ProxyEngine) RoundTrip(req *http.Request) (*http.Response, error) {
return p5.GetHTTPClient().Do(req)

View File

@ -23,11 +23,9 @@ func (p5 *ProxyEngine) Dial(network, addr string) (net.Conn, error) {
// DialTimeout is a simple stub adapter to implement a net.Dialer with a timeout.
func (p5 *ProxyEngine) DialTimeout(network, addr string, timeout time.Duration) (net.Conn, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(timeout))
go func() { // this is a goroutine that calls cancel() upon the deadline expiring to avoid context leaks
<-ctx.Done()
cancel()
}()
return p5.mysteryDialer(ctx, network, addr)
nc, err := p5.mysteryDialer(ctx, network, addr)
cancel()
return nc, err
}
func (p5 *ProxyEngine) addTimeout(socksString string) string {
@ -47,19 +45,29 @@ func (p5 *ProxyEngine) popSockAndLockIt(ctx context.Context) (*Proxy, error) {
case <-ctx.Done():
return nil, fmt.Errorf("context done: %w", ctx.Err())
default:
if atomic.CompareAndSwapUint32(&sock.lock, stateUnlocked, stateLocked) {
// p5.msgGotLock(socksString)
return sock, nil
}
select {
case p5.Pending <- sock:
// p5.msgCantGetLock(socksString, true)
return nil, nil
default:
p5.msgCantGetLock(sock.String(), false)
return nil, nil
}
//
}
if sock == nil {
return nil, nil
}
if atomic.CompareAndSwapUint32(&sock.lock, stateUnlocked, stateLocked) {
// p5.msgGotLock(socksString)
return sock, nil
}
switch sock.GetProto() {
case ProtoSOCKS5:
p5.Valids.SOCKS5.add(sock)
case ProtoSOCKS4:
p5.Valids.SOCKS4.add(sock)
case ProtoSOCKS4a:
p5.Valids.SOCKS4a.add(sock)
case ProtoHTTP:
p5.Valids.HTTP.add(sock)
default:
return nil, fmt.Errorf("unknown protocol: %s", sock.GetProto())
}
return nil, nil
}
// mysteryDialer is a dialer function that will use a different proxy for every request.
@ -69,23 +77,25 @@ func (p5 *ProxyEngine) mysteryDialer(ctx context.Context, network, addr string)
var count = 0
for {
max := p5.GetDialerBailout()
if count > max {
switch {
case count > max:
return nil, fmt.Errorf("giving up after %d tries", max)
}
if err := ctx.Err(); err != nil {
return nil, fmt.Errorf("context error: %w", err)
}
if p5.conCtx.Err() != nil {
case ctx.Err() != nil:
return nil, fmt.Errorf("context error: %w", ctx.Err())
case p5.conCtx.Err() != nil:
return nil, fmt.Errorf("context closed")
default:
//
}
var sock *Proxy
for {
if p5.scale() {
time.Sleep(1 * time.Millisecond)
time.Sleep(5 * time.Millisecond)
}
var err error
sock, err = p5.popSockAndLockIt(ctx)
if err != nil {
println(err.Error())
return nil, err
}
if sock != nil {
@ -109,16 +119,16 @@ func (p5 *ProxyEngine) mysteryDialer(ctx context.Context, network, addr string)
continue
}
p5.msgUsingProxy(socksString)
go func() {
select {
case <-ctx.Done():
_ = conn.Close()
case <-p5.conCtx.Done():
_ = conn.Close()
case <-p5.ctx.Done():
_ = conn.Close()
}
}()
/* go func() {
select {
case <-ctx.Done():
_ = conn.Close()
case <-p5.conCtx.Done():
_ = conn.Close()
case <-p5.ctx.Done():
_ = conn.Close()
}
}()*/
return conn, nil
}
}

View File

@ -21,14 +21,13 @@ func (p5 *ProxyEngine) scale() (sleep bool) {
case <-p5.scaleTimer.C:
bad := int64(0)
totalBadNow := p5.GetTotalBad()
totalChanFullNow := atomic.LoadInt64(&p5.stats.timesChannelFull)
accountedFor := atomic.LoadInt64(&p5.stats.badAccounted) + atomic.LoadInt64(&p5.stats.fullChanAccounted)
netFactors := (totalBadNow + totalChanFullNow) - accountedFor
if time.Since(p5.stats.accountingLastDone) > 600*time.Millisecond && netFactors > 0 {
accountedFor := atomic.LoadInt64(&p5.stats.badAccounted)
netFactors := totalBadNow - accountedFor
if time.Since(p5.stats.accountingLastDone) > 5*time.Second && netFactors > 0 {
bad = int64(netFactors)
if p5.DebugEnabled() {
p5.DebugLogger.Printf("accounting: %d bad, %d full, %d accounted, %d net factors",
totalBadNow, totalChanFullNow, accountedFor, netFactors)
p5.DebugLogger.Printf("accounting: %d bad - %d accounted for = %d net factors",
totalBadNow, accountedFor, netFactors)
}
p5.stats.accountingLastDone = time.Now()
}
@ -55,7 +54,6 @@ func (p5 *ProxyEngine) scale() (sleep bool) {
totalValidated,
totalConsidered,
) {
atomic.AddInt64(&p5.stats.fullChanAccounted, 1)
p5.scaleDbg()
}
default:

View File

@ -146,14 +146,26 @@ func (p5 *ProxyEngine) DisableDebugRedaction() {
p5.opt.Unlock()
}
/*func (p5 *ProxyEngine) EnableListShuffle() {
func (p5 *ProxyEngine) EnableRecyclerShuffling() {
p5.opt.Lock()
p5.opt.shuffle = true
p5.opt.Unlock()
}
func (p5 *ProxyEngine) DisableListShuffle() {
func (p5 *ProxyEngine) DisableRecyclerShuffling() {
p5.opt.Lock()
p5.opt.shuffle = false
p5.opt.Unlock()
}*/
}
func (p5 *ProxyEngine) EnableHTTPClientTLSVerification() {
p5.opt.Lock()
p5.opt.tlsVerify = true
p5.opt.Unlock()
}
func (p5 *ProxyEngine) DisableHTTPClientTLSVerification() {
p5.opt.Lock()
p5.opt.tlsVerify = false
p5.opt.Unlock()
}

View File

@ -37,7 +37,7 @@ func (p5 *ProxyEngine) StartSOCKS5Server(listen, username, password string) erro
opts := []socks5.Option{
socks5.WithBufferPool(bufs),
socks5.WithLogger(p5.DebugLogger),
socks5.WithDial(p5.mysteryDialer),
socks5.WithDial(p5.DialContext),
}
if username != "" && password != "" {
cator := socks5.UserPassAuthenticator{Credentials: socks5.StaticCredentials{username: password}}

View File

@ -24,11 +24,8 @@ type Statistics struct {
// birthday represents the time we started checking proxies with this pool
birthday time.Time
timesChannelFull int64
fullChanAccounted int64
lastWarnedChannelFull time.Time
badAccounted int64
accountingLastDone time.Time
badAccounted int64
accountingLastDone time.Time
}
func (stats *Statistics) dispense() {

View File

@ -190,6 +190,12 @@ func (sock *Proxy) validate() {
}
defer atomic.StoreUint32(&sock.lock, stateUnlocked)
select {
case <-sock.parent.ctx.Done():
return
default:
}
pe := sock.parent
if pe.useProx.Check(sock) {
// s.dbgPrint("useProx ratelimited: " + sock.Endpoint )
@ -204,7 +210,8 @@ func (sock *Proxy) validate() {
// TODO: consider giving the option for verbose logging of this stuff?
if sock.timesValidated == 0 || sock.protocol.Get() == ProtoNull {
switch {
case sock.timesValidated == 0, sock.protocol.Get() == ProtoNull:
// try to use the proxy with all 3 SOCKS versions
for tryProto := range protoMap {
if tryProto == ProtoNull {
@ -222,7 +229,7 @@ func (sock *Proxy) validate() {
break
}
}
} else {
default:
if err := pe.singleProxyCheck(sock, sock.GetProto()); err != nil {
sock.bad()
pe.badProx.Check(sock)
@ -245,7 +252,7 @@ func (sock *Proxy) validate() {
}
func (p5 *ProxyEngine) tally(sock *Proxy) bool {
var target chan *Proxy
var target proxyList
switch sock.protocol.Get() {
case ProtoSOCKS4:
p5.stats.v4()
@ -262,8 +269,8 @@ func (p5 *ProxyEngine) tally(sock *Proxy) bool {
default:
return false
}
select {
case target <- sock:
return true
}
target.Lock()
target.PushBack(sock)
target.Unlock()
return true
}