prox5/dispense.go
kayos 471f7690d2
dev->staging (#91)
* Fix: Didn't unlock ._.

* Refactor: reduce complexity

* Fix: part of my commit message ended up in the commit :^)

* Fix: mutex locking snafu

* Fix: if proxy disqualified then it's not still good

* Fix race cond. in socks lib + add tests + update uagents

* Chore: tidy up

* Fix: bad var name

* Testing: Make integration test more realistic

* Update CI

* Fix go vet: returning fatal from goroutine

* Fix: remove gotrace

* CI: Add PR summarizer

* Chore: gomod

* Chore[CI]: fix branch in workflow
2023-08-11 22:51:49 -07:00

139 lines
3.0 KiB
Go

package prox5
import (
"sync/atomic"
"time"
)
func (p5 *ProxyEngine) getSocksStr(proto ProxyProtocol) string {
var sock *Proxy
var list *proxyList
switch proto {
case ProtoSOCKS4:
list = &p5.Valids.SOCKS4
case ProtoSOCKS4a:
list = &p5.Valids.SOCKS4a
case ProtoSOCKS5:
list = &p5.Valids.SOCKS5
case ProtoHTTP:
list = &p5.Valids.HTTP
}
for {
if list.Len() == 0 {
p5.recycling()
time.Sleep(250 * time.Millisecond)
continue
}
list.Lock()
sock = list.Remove(list.Front()).(*Proxy)
list.Unlock()
switch {
case sock == nil:
p5.recycling()
time.Sleep(250 * time.Millisecond)
continue
case !p5.stillGood(sock):
continue
default:
p5.stats.dispense()
return sock.Endpoint
}
}
}
// Socks5Str gets a SOCKS5 proxy that we have fully verified (dialed and then retrieved our IP address from a what-is-my-ip endpoint.
// Will block if one is not available!
func (p5 *ProxyEngine) Socks5Str() string {
return p5.getSocksStr(ProtoSOCKS5)
}
// Socks4Str gets a SOCKS4 proxy that we have fully verified.
// Will block if one is not available!
func (p5 *ProxyEngine) Socks4Str() string {
return p5.getSocksStr(ProtoSOCKS4)
}
// Socks4aStr gets a SOCKS4 proxy that we have fully verified.
// Will block if one is not available!
func (p5 *ProxyEngine) Socks4aStr() string {
return p5.getSocksStr(ProtoSOCKS4a)
}
// GetHTTPTunnel checks for an available HTTP CONNECT proxy in our pool.
func (p5 *ProxyEngine) GetHTTPTunnel() string {
return p5.getSocksStr(ProtoHTTP)
}
// GetAnySOCKS retrieves any version SOCKS proxy as a Proxy type
// Will block if one is not available!
func (p5 *ProxyEngine) GetAnySOCKS() *Proxy {
defer p5.stats.dispense()
for {
var sock *Proxy
select {
case <-p5.ctx.Done():
return nil
default:
time.Sleep(2 * time.Millisecond)
}
for _, list := range p5.Valids.Slice() {
list.RLock()
if list.Len() < 1 {
time.Sleep(15 * time.Millisecond)
list.RUnlock()
continue
}
list.RUnlock()
sock = list.pop()
switch {
case sock == nil:
p5.recycling()
time.Sleep(50 * time.Millisecond)
case p5.stillGood(sock):
return sock
default:
}
continue
}
}
}
func (p5 *ProxyEngine) stillGood(sock *Proxy) bool {
if sock == nil {
return false
}
if !atomic.CompareAndSwapUint32(&sock.lock, stateUnlocked, stateLocked) {
return false
}
defer atomic.StoreUint32(&sock.lock, stateUnlocked)
if p5.GetRemoveAfter() != -1 && atomic.LoadInt64(&sock.timesBad) > int64(p5.GetRemoveAfter()) {
buf := strs.Get()
buf.MustWriteString("deleting from map (too many failures): ")
buf.MustWriteString(sock.Endpoint)
p5.dbgPrint(buf)
if err := p5.proxyMap.delete(sock.Endpoint); err != nil {
p5.dbgPrint(simpleString(err.Error()))
}
return false
}
if p5.badProx.Peek(sock) {
p5.msgBadProxRate(sock)
return false
}
if time.Since(sock.lastValidated) > p5.opt.stale {
buf := strs.Get()
buf.MustWriteString("proxy stale: ")
buf.MustWriteString(sock.Endpoint)
p5.dbgPrint(buf)
p5.stats.stale()
return false
}
return true
}