dev->staging (#91)

* Fix: Didn't unlock ._.

* Refactor: reduce complexity

* Fix: part of my commit message ended up in the commit :^)

* Fix: mutex locking snafu

* Fix: if proxy disqualified then it's not still good

* Fix race cond. in socks lib + add tests + update uagents

* Chore: tidy up

* Fix: bad var name

* Testing: Make integration test more realistic

* Update CI

* Fix go vet: returning fatal from goroutine

* Fix: remove gotrace

* CI: Add PR summarizer

* Chore: gomod

* Chore[CI]: fix branch in workflow
This commit is contained in:
kayos 2023-08-11 22:51:49 -07:00 committed by GitHub
parent 2ae83ec14f
commit 471f7690d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 413 additions and 70 deletions

View File

@ -20,9 +20,10 @@ const (
// Start starts our proxy pool operations. Trying to start a running ProxyEngine will return an error.
func (p5 *ProxyEngine) Start() error {
if atomic.LoadUint32(&p5.Status) != uint32(stateNew) {
p5.dbgPrint(simpleString("proxy pool has been started before, resuming instead"))
p5.DebugLogger.Printf("proxy pool has been started before, resuming instead")
return p5.Resume()
}
p5.DebugLogger.Printf("starting prox5")
p5.startDaemons()
return nil
}
@ -51,6 +52,7 @@ func (p5 *ProxyEngine) Pause() error {
func (p5 *ProxyEngine) startDaemons() {
go p5.jobSpawner()
atomic.StoreUint32(&p5.Status, uint32(stateRunning))
p5.DebugLogger.Printf("prox5 started")
}
// Resume will resume pause proxy pool operations, attempting to resume a running ProxyEngine is returns an error.

View File

@ -60,15 +60,6 @@ func (p5 *ProxyEngine) recycling() int {
var count = 0
tpls := make(chan cmap.Tuple[string, *Proxy], p5.proxyMap.plot.Count())
recycleTuples := func(items chan cmap.Tuple[string, *Proxy]) {
for tuple := range items {
p5.Pending.add(tuple.Val)
count++
}
}
switch p5.GetRecyclerShuffleStatus() {
case true:
var tuples []cmap.Tuple[string, *Proxy]
@ -79,16 +70,16 @@ func (p5 *ProxyEngine) recycling() int {
tuples[i], tuples[j] = tuples[j], tuples[i]
})
for _, tuple := range tuples {
tpls <- tuple
p5.Pending.add(tuple.Val)
count++
}
case false:
for tuple := range p5.proxyMap.plot.IterBuffered() {
tpls <- tuple
p5.Pending.add(tuple.Val)
count++
}
}
recycleTuples(tpls)
return count
}

View File

@ -38,7 +38,11 @@ func (s SocksLogger) Printf(format string, a ...interface{}) {
type basicPrinter struct{}
func (b *basicPrinter) Print(str string) {
func (b *basicPrinter) Print(a ...any) {
if len(a) == 0 {
return
}
str := fmt.Sprint(a...)
if useDebugChannel {
debugChan <- str
return
@ -46,7 +50,6 @@ func (b *basicPrinter) Print(str string) {
buf := strs.Get()
buf.MustWriteString(stamp)
buf.MustWriteString(str)
println(buf.String())
strs.MustPut(buf)
}
@ -68,6 +71,7 @@ func (p5 *ProxyEngine) DebugEnabled() bool {
// EnableDebug enables printing of verbose messages during operation
func (p5 *ProxyEngine) EnableDebug() {
atomic.StoreUint32(debugStatus, debugEnabled)
p5.dbgPrint(simpleString("prox5 debug enabled"))
}
// DisableDebug enables printing of verbose messages during operation.
@ -87,7 +91,7 @@ func (p5 *ProxyEngine) dbgPrint(builder *pool.String) {
if !p5.DebugEnabled() {
return
}
p5.DebugLogger.Print(builder.String())
p5.DebugLogger.Printf(builder.String())
return
}
@ -205,6 +209,9 @@ func (p5 *ProxyEngine) msgBadProxRate(sock *Proxy) {
if !p5.DebugEnabled() {
return
}
if p5.lastBadProxAnnnounced.Load().(string) == sock.Endpoint {
return
}
sockString := sock.Endpoint
if p5.GetDebugRedactStatus() {
sockString = "(redacted)"
@ -213,6 +220,7 @@ func (p5 *ProxyEngine) msgBadProxRate(sock *Proxy) {
buf.MustWriteString("badProx ratelimited: ")
buf.MustWriteString(sockString)
p5.dbgPrint(buf)
p5.lastBadProxAnnnounced.Store(sock.Endpoint)
}
// ------------

128
defs.go
View File

@ -68,7 +68,7 @@ type ProxyEngine struct {
stats *Statistics
Status uint32
// Pending is a constant stream of proxy strings to be verified
Pending proxyList
@ -99,6 +99,8 @@ type ProxyEngine struct {
recycleTimer *time.Ticker
lastBadProxAnnnounced *atomic.Value
opt *config
runningdaemons int32
conductor chan bool
@ -141,8 +143,8 @@ func defOpt() *config {
tlsVerify: false,
shuffle: true,
}
sm.validationTimeout = time.Duration(18) * time.Second
sm.serverTimeout = time.Duration(180) * time.Second
sm.validationTimeout = time.Duration(9) * time.Second
sm.serverTimeout = time.Duration(15) * time.Second
return sm
}
@ -202,11 +204,12 @@ type Swamp struct {
// NewProxyEngine returns a ProxyEngine with default options.
// After calling this you may use the various "setters" to change the options before calling ProxyEngine.Start().
func NewProxyEngine() *ProxyEngine {
pe := &ProxyEngine{
p5 := &ProxyEngine{
stats: &Statistics{birthday: time.Now()},
DebugLogger: &basicPrinter{},
opt: defOpt(),
opt: defOpt(),
lastBadProxAnnnounced: &atomic.Value{},
conductor: make(chan bool),
mu: &sync.RWMutex{},
@ -215,15 +218,16 @@ func NewProxyEngine() *ProxyEngine {
Status: uint32(stateNew),
}
pe.httpOptsDirty.Store(false)
pe.httpClients = &sync.Pool{New: func() interface{} { return pe.newHTTPClient() }}
p5.lastBadProxAnnnounced.Store("")
p5.httpOptsDirty.Store(false)
p5.httpClients = &sync.Pool{New: func() interface{} { return p5.newHTTPClient() }}
stats := []int64{pe.stats.Valid4, pe.stats.Valid4a, pe.stats.Valid5, pe.stats.ValidHTTP, pe.stats.Dispensed}
stats := []int64{p5.stats.Valid4, p5.stats.Valid4a, p5.stats.Valid5, p5.stats.ValidHTTP, p5.stats.Dispensed}
for i := range stats {
atomic.StoreInt64(&stats[i], 0)
}
lists := []*proxyList{&pe.Valids.SOCKS5, &pe.Valids.SOCKS4, &pe.Valids.SOCKS4a, &pe.Valids.HTTP, &pe.Pending}
lists := []*proxyList{&p5.Valids.SOCKS5, &p5.Valids.SOCKS4, &p5.Valids.SOCKS4a, &p5.Valids.HTTP, &p5.Pending}
for _, c := range lists {
*c = proxyList{
List: &list.List{},
@ -231,38 +235,38 @@ func NewProxyEngine() *ProxyEngine {
}
}
pe.dispenseMiddleware = func(p *Proxy) (*Proxy, bool) {
p5.dispenseMiddleware = func(p *Proxy) (*Proxy, bool) {
return p, true
}
pe.ctx, pe.quit = context.WithCancel(context.Background())
pe.conCtx, pe.killConns = context.WithCancel(context.Background())
pe.proxyMap = newProxyMap(pe)
p5.ctx, p5.quit = context.WithCancel(context.Background())
p5.conCtx, p5.killConns = context.WithCancel(context.Background())
p5.proxyMap = newProxyMap(p5)
atomic.StoreUint32(&pe.Status, uint32(stateNew))
atomic.StoreInt32(&pe.runningdaemons, 0)
atomic.StoreUint32(&p5.Status, uint32(stateNew))
atomic.StoreInt32(&p5.runningdaemons, 0)
pe.useProx = rl.NewCustomLimiter(pe.opt.useProxConfig)
pe.badProx = rl.NewCustomLimiter(pe.opt.badProxConfig)
p5.useProx = rl.NewCustomLimiter(p5.opt.useProxConfig)
p5.badProx = rl.NewCustomLimiter(p5.opt.badProxConfig)
var err error
pe.pool, err = ants.NewPool(pe.opt.maxWorkers, ants.WithOptions(ants.Options{
p5.pool, err = ants.NewPool(p5.opt.maxWorkers, ants.WithOptions(ants.Options{
ExpiryDuration: 2 * time.Minute,
PanicHandler: pe.pondPanic,
PanicHandler: p5.pondPanic,
}))
pe.scaler = scaler.NewAutoScaler(pe.opt.maxWorkers, pe.opt.maxWorkers+100, 50)
pe.scaleTimer = time.NewTicker(1 * time.Second)
pe.recycleTimer = time.NewTicker(500 * time.Millisecond)
p5.scaler = scaler.NewAutoScaler(p5.opt.maxWorkers, p5.opt.maxWorkers+100, 50)
p5.scaleTimer = time.NewTicker(1 * time.Second)
p5.recycleTimer = time.NewTicker(500 * time.Millisecond)
if err != nil {
buf := strs.Get()
buf.MustWriteString("CRITICAL: ")
buf.MustWriteString(err.Error())
pe.dbgPrint(buf)
p5.dbgPrint(buf)
panic(err)
}
return pe
return p5
}
func newProxyMap(pe *ProxyEngine) proxyMap {
@ -279,24 +283,60 @@ func (p5 *ProxyEngine) pondPanic(p interface{}) {
// defaultUserAgents is a small list of user agents to use during validation.
var defaultUserAgents = []string{
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:60.0) Gecko/20100101 Firefox/60.0",
"Mozilla/5.0 (Windows NT 6.2; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0",
"Mozilla/5.0 (Windows NT 6.2; Win64; x64; rv:24.0) Gecko/20140419 Firefox/24.0 PaleMoon/24.5.0",
"Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:44.0) Gecko/20100101 Firefox/44.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:49.0) Gecko/20100101 Firefox/49.0",
"Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:55.0) Gecko/20100101 Firefox/55.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:47.0) Gecko/20100101 Firefox/--.0",
"Mozilla/5.0 (Windows NT 6.0; rv:19.0) Gecko/20100101 Firefox/19.0",
"Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:45.0) Gecko/20100101 Firefox/45.0",
"Mozilla/5.0 (Windows NT 6.0; WOW64; rv:45.0) Gecko/20100101 Firefox/45.0",
"Mozilla/5.0 (FreeBSD; Viera; rv:34.0) Gecko/20100101 Firefox/34.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.7; rv:20.0) Gecko/20100101 Firefox/20.0",
"Mozilla/5.0 (Android 6.0; Mobile; rv:60.0) Gecko/20100101 Firefox/60.0",
"Mozilla/5.0 (Windows NT 5.1; rv:37.0) Gecko/20100101 Firefox/37.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:35.0) Gecko/20100101 Firefox/35.0 evaliant",
"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:28.0) Gecko/20100101 Firefox/28.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:58.0) Gecko/20100101 Firefox/58.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:60.0) Gecko/20100101 Firefox/60.0",
"Mozilla/5.0 (Windows NT 10.0; WOW64; rv:45.0) Gecko/20100101 Firefox/45.0",
"Mozilla/5.0 (Windows NT 6.2; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv109.0) Gecko/20100101 Firefox/115.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64; rv109.0) Gecko/20100101 Firefox/115.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv109.0) Gecko/20100101 Firefox/115.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5.2 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; rv109.0) Gecko/20100101 Firefox/115.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv109.0) Gecko/20100101 Firefox/116.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.82",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv109.0) Gecko/20100101 Firefox/115.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5.1 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.188",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv109.0) Gecko/20100101 Firefox/114.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64; rv102.0) Gecko/20100101 Firefox/102.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Safari/537.36 Edg/115.0.1901.183",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67",
"Mozilla/5.0 (X11; Linux x86_64; rv109.0) Gecko/20100101 Firefox/114.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 OPR/100.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv102.0) Gecko/20100101 Firefox/102.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv109.0) Gecko/20100101 Firefox/114.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.79",
"Mozilla/5.0 (X11; Linux x86_64; rv109.0) Gecko/20100101 Firefox/116.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Safari/605.1.15",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.75 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36 OPR/99.0.0.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.5 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.4 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; CrOS x86_64 14541.0.0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; rv102.0) Gecko/20100101 Firefox/102.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; WOW64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.5666.197 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; rv109.0) Gecko/20100101 Firefox/116.0",
"Mozilla/5.0 (Windows NT 10.0; rv114.0) Gecko/20100101 Firefox/114.0",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv109.0) Gecko/20100101 Firefox/114.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.88 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.6.1 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv109.0) Gecko/20100101 Firefox/116.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.86",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv109.0) Gecko/20100101 Firefox/113.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.3 Safari/605.1.15",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 YaBrowser/23.5.4.674 Yowser/2.5 Safari/537.36",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv109.0) Gecko/20100101 Firefox/116.0",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36",
}

View File

@ -117,13 +117,11 @@ func (p5 *ProxyEngine) stillGood(sock *Proxy) bool {
if err := p5.proxyMap.delete(sock.Endpoint); err != nil {
p5.dbgPrint(simpleString(err.Error()))
}
return false
}
if p5.badProx.Peek(sock) {
buf := strs.Get()
buf.MustWriteString("badProx dial ratelimited: ")
buf.MustWriteString(sock.Endpoint)
p5.dbgPrint(buf)
p5.msgBadProxRate(sock)
return false
}

View File

@ -10,7 +10,7 @@ var dummyPool *ants.Pool
func init() {
var err error
dummyPool, err = ants.NewPool(5, ants.WithNonblocking(true))
dummyPool, err = ants.NewPool(5, ants.WithNonblocking(false))
if err != nil {
panic(err)
}

View File

@ -50,11 +50,14 @@ func (p5 *ProxyEngine) LoadProxyTXT(seedFile string) (count int) {
func (p5 *ProxyEngine) LoadSingleProxy(sock string) bool {
var ok bool
if sock, ok = filter(sock); !ok {
p5.dbgPrint(simpleString("invalid proxy format"))
return false
}
if err := p5.loadSingleProxy(sock); err != nil {
p5.dbgPrint(simpleString(err.Error()))
return false
}
// p5.dbgPrint(simpleString("loaded proxy " + sock))
return true
}

View File

@ -1,8 +1,6 @@
package logger
type Logger interface {
// Print implementations at this time are actually println
Print(str string)
Printf(format string, a ...interface{})
Errorf(format string, a ...interface{})
}

View File

@ -39,7 +39,24 @@ func (p5 *ProxyEngine) addTimeout(socksString string) string {
return socksString
}
func (p5 *ProxyEngine) isEmpty() bool {
stats := p5.GetStatistics()
if stats.Checked == 0 {
return true
}
if stats.Valid5+stats.Valid4+stats.Valid4a+stats.ValidHTTP == 0 {
return true
}
return false
}
var ErrNoProxies = fmt.Errorf("no proxies available")
func (p5 *ProxyEngine) popSockAndLockIt(ctx context.Context) (*Proxy, error) {
if p5.isEmpty() {
p5.scale()
return nil, ErrNoProxies
}
sock := p5.GetAnySOCKS()
select {
case <-ctx.Done():
@ -70,9 +87,31 @@ func (p5 *ProxyEngine) popSockAndLockIt(ctx context.Context) (*Proxy, error) {
return nil, nil
}
func (p5 *ProxyEngine) announceDial(network, addr string) {
s := strs.Get()
s.MustWriteString("prox5 dialing: ")
s.MustWriteString(network)
s.MustWriteString("://")
if p5.opt.redact {
s.MustWriteString("[redacted]")
} else {
s.MustWriteString(addr)
}
s.MustWriteString(addr)
s.MustWriteString("...")
p5.dbgPrint(s)
}
// mysteryDialer is a dialer function that will use a different proxy for every request.
// If you're looking for this function, it has been unexported. Use Dial, DialTimeout, or DialContext instead.
func (p5 *ProxyEngine) mysteryDialer(ctx context.Context, network, addr string) (net.Conn, error) {
p5.announceDial(network, addr)
if p5.isEmpty() {
// p5.dbgPrint(simpleString("prox5: no proxies available"))
return nil, ErrNoProxies
}
// pull down proxies from channel until we get a proxy good enough for our spoiled asses
var count = 0
for {

View File

@ -19,7 +19,7 @@ const (
)
var protoMap = map[ProxyProtocol]string{
ProtoSOCKS5: "socks5", ProtoNull: "", ProtoSOCKS4: "socks4", ProtoSOCKS4a: "socks4a",
ProtoSOCKS5: "socks5", ProtoNull: "unknown", ProtoSOCKS4: "socks4", ProtoSOCKS4a: "socks4a",
}
func (p ProxyProtocol) String() string {

204
prox5_test.go Normal file
View File

@ -0,0 +1,204 @@
package prox5
import (
"context"
"errors"
"io"
"net"
"net/http"
"strconv"
"sync/atomic"
"testing"
"time"
"git.tcp.direct/kayos/common/entropy"
"git.tcp.direct/kayos/go-socks5"
)
type randomFail struct {
t *testing.T
failedCount int64
maxFail int64
failOneOutOf int
}
func (rf *randomFail) fail() bool {
if rf.failOneOutOf == 0 {
return false
}
doFail := entropy.OneInA(rf.failOneOutOf)
if !doFail {
return false
}
atomic.AddInt64(&rf.failedCount, 1)
rf.t.Logf("random SOCKS failure triggered, total fail count: %d", rf.failedCount)
if rf.maxFail > 0 && atomic.LoadInt64(&rf.failedCount) > rf.maxFail {
rf.t.Errorf("[FAIL] random SOCKS failure triggered too many times, total fail count: %d", rf.failedCount)
}
return true
}
type dummyHTTPServer struct {
t *testing.T
net.Listener
}
func timeNowJSON() []byte {
js, _ := time.Now().MarshalJSON()
return js
}
func newDummyHTTPSServer(t *testing.T, port int) {
t.Helper()
dtcp := &dummyHTTPServer{t: t}
var err error
if dtcp.Listener, err = net.Listen("tcp", ":"+strconv.Itoa(port)); err != nil && !errors.Is(err, net.ErrClosed) {
t.Fatal(err)
}
go func() {
if err = http.Serve(dtcp, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(time.Duration(entropy.RNG(300)) * time.Millisecond)
if _, err = w.Write(timeNowJSON()); err != nil {
t.Error("[FAIL] http server failed to write JSON: " + err.Error())
}
})); err != nil && !errors.Is(err, net.ErrClosed) {
t.Error("[FAIL] http.Serve error: " + err.Error())
}
}()
t.Cleanup(func() {
_ = dtcp.Close()
})
t.Logf("dummy HTTPS server listening on port %d", port)
}
var ErrRandomFail = errors.New("random failure")
func dummySOCKSServer(t *testing.T, port int, rf ...*randomFail) {
t.Helper()
var failure = &randomFail{t: t, failedCount: int64(0), failOneOutOf: 0}
if len(rf) > 0 {
failure = rf[0]
}
dialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
if failure.fail() {
return nil, ErrRandomFail
}
time.Sleep(time.Duration(entropy.RNG(300)) * time.Millisecond)
return net.Dial(network, addr)
}
server := socks5.NewServer(socks5.WithDial(dialer))
go func() {
err := server.ListenAndServe("tcp", "127.0.0.1:"+strconv.Itoa(port))
if err != nil && !errors.Is(err, net.ErrClosed) {
t.Error("[FAIL] socks server failure: " + err.Error())
}
}()
}
type p5TestLogger struct {
t *testing.T
}
func (tl p5TestLogger) Errorf(format string, args ...interface{}) {
tl.t.Logf(format, args...)
}
func (tl p5TestLogger) Printf(format string, args ...interface{}) {
tl.t.Logf(format, args...)
}
func (tl p5TestLogger) Print(args ...interface{}) {
tl.t.Log(args...)
}
func TestProx5(t *testing.T) {
for i := 0; i < 100; i++ {
dummySOCKSServer(t, 5555+i, &randomFail{
t: t,
failedCount: int64(0),
failOneOutOf: entropy.RNG(100),
maxFail: 50,
})
time.Sleep(time.Millisecond * 5)
}
newDummyHTTPSServer(t, 8055)
time.Sleep(time.Millisecond * 350)
p5 := NewProxyEngine()
p5.SetAndEnableDebugLogger(p5TestLogger{t: t})
p5.SetMaxWorkers(10)
p5.EnableAutoScaler()
p5.SetAutoScalerThreshold(10)
p5.SetAutoScalerMaxScale(100)
// p5.DisableRecycling()
p5.SetRemoveAfter(2)
var index = 5555
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
load := func() {
if index > 5655 {
return
}
time.Sleep(time.Millisecond * 100)
index++
p5.LoadSingleProxy("127.0.0.1:" + strconv.Itoa(index))
}
var successCount int64 = 0
makeReq := func() {
select {
case <-ctx.Done():
return
default:
}
resp, err := p5.GetHTTPClient().Get("http://127.0.0.1:8055")
if err != nil && !errors.Is(err, ErrNoProxies) {
t.Error(err)
}
if errors.Is(err, ErrNoProxies) {
return
}
b, e := io.ReadAll(resp.Body)
if e != nil && !errors.Is(e, net.ErrClosed) {
t.Log("[WARN] " + e.Error())
}
t.Logf("got proxied response: %s", string(b))
atomic.AddInt64(&successCount, 1)
}
ticker := time.NewTicker(time.Millisecond * 100)
if err := p5.Start(); err != nil {
t.Fatal(err)
}
wait := 0
testLoop:
for {
select {
case <-ctx.Done():
successCountFinal := atomic.LoadInt64(&successCount)
if successCountFinal < 10 {
t.Fatal("no successful requests")
}
t.Logf("total successful requests: %d", successCountFinal)
break testLoop
case <-ticker.C:
// pre-warm
wait++
if wait >= 50 {
go makeReq()
}
default:
load()
}
}
cancel()
}

View File

@ -14,7 +14,7 @@ var defaultUseProxyRatelimiter = rl.Policy{
var defaultBadProxyRateLimiter = rl.Policy{
Window: 55,
Burst: 5,
Burst: 10,
}
const (

View File

@ -11,6 +11,7 @@ func (p5 *ProxyEngine) AddUserAgents(uagents []string) {
p5.mu.Lock()
p5.opt.userAgents = append(p5.opt.userAgents, uagents...)
p5.mu.Unlock()
p5.DebugLogger.Printf("added %d useragents to cycle through for proxy validation", len(uagents))
}
// SetUserAgents sets the list of useragents we randomly choose from during proxied requests
@ -18,6 +19,7 @@ func (p5 *ProxyEngine) SetUserAgents(uagents []string) {
p5.mu.Lock()
p5.opt.userAgents = uagents
p5.mu.Unlock()
p5.DebugLogger.Printf("set %d useragents to cycle through for proxy validation", len(uagents))
}
// SetCheckEndpoints replaces the running list of whatismyip style endpoitns for validation. (must return only the WAN IP)
@ -25,6 +27,7 @@ func (p5 *ProxyEngine) SetCheckEndpoints(newendpoints []string) {
p5.mu.Lock()
p5.opt.checkEndpoints = newendpoints
p5.mu.Unlock()
p5.DebugLogger.Printf("set %d check endpoints for proxy validations", len(newendpoints))
}
// AddCheckEndpoints appends entries to the running list of whatismyip style endpoitns for validation. (must return only the WAN IP)
@ -32,6 +35,7 @@ func (p5 *ProxyEngine) AddCheckEndpoints(endpoints []string) {
p5.mu.Lock()
p5.opt.checkEndpoints = append(p5.opt.checkEndpoints, endpoints...)
p5.mu.Unlock()
p5.DebugLogger.Printf("added %d check endpoints for proxy validations", len(endpoints))
}
// SetStaleTime replaces the duration of time after which a proxy will be considered "stale". stale proxies will be skipped upon retrieval.
@ -39,6 +43,7 @@ func (p5 *ProxyEngine) SetStaleTime(newtime time.Duration) {
p5.opt.Lock()
p5.opt.stale = newtime
p5.opt.Unlock()
p5.DebugLogger.Printf("prox5 stale time set to %s", newtime)
}
// SetValidationTimeout sets the validationTimeout option.
@ -46,6 +51,7 @@ func (p5 *ProxyEngine) SetValidationTimeout(timeout time.Duration) {
p5.opt.Lock()
p5.opt.validationTimeout = timeout
p5.opt.Unlock()
p5.DebugLogger.Printf("prox5 validation timeout set to %s", timeout)
}
// SetServerTimeout sets the serverTimeout option.
@ -55,12 +61,19 @@ func (p5 *ProxyEngine) SetServerTimeout(timeout time.Duration) {
p5.opt.Lock()
p5.opt.serverTimeout = timeout
p5.opt.Unlock()
p5.DebugLogger.Printf("prox5 server timeout set to %s", timeout)
}
// SetMaxWorkers set the maximum workers for proxy checking.
func (p5 *ProxyEngine) SetMaxWorkers(num int) {
if p5.isEmpty() && num < 2 {
p5.DebugLogger.
Printf("prox5 cannot set max workers to %d, minimum is 2 until we have some valid proxies", num)
num = 2
}
p5.pool.Tune(num)
p5.scaler.SetBaseline(num)
}
// EnableRecycling enables recycling used proxies back into the pending channel for revalidation after dispensed.
@ -68,6 +81,7 @@ func (p5 *ProxyEngine) EnableRecycling() {
p5.opt.Lock()
p5.opt.recycle = true
p5.opt.Unlock()
p5.DebugLogger.Printf("prox5 recycling enabled")
}
// DisableRecycling disables recycling used proxies back into the pending channel for revalidation after dispensed.
@ -75,6 +89,7 @@ func (p5 *ProxyEngine) DisableRecycling() {
p5.opt.Lock()
p5.opt.recycle = false
p5.opt.Unlock()
p5.DebugLogger.Printf("prox5 recycling disabled")
}
// SetRemoveAfter sets the removeafter policy, the amount of times a recycled proxy is marked as bad before it is removed entirely.
@ -85,6 +100,7 @@ func (p5 *ProxyEngine) SetRemoveAfter(timesfailed int) {
p5.opt.Lock()
p5.opt.removeafter = timesfailed
p5.opt.Unlock()
p5.DebugLogger.Printf("prox5 removeafter policy set to %d", timesfailed)
}
// SetDialerBailout sets the amount of times the mysteryDialer will dial out and fail before it bails out.
@ -93,6 +109,7 @@ func (p5 *ProxyEngine) SetDialerBailout(dialattempts int) {
p5.opt.Lock()
p5.opt.dialerBailout = dialattempts
p5.opt.Unlock()
p5.DebugLogger.Printf("prox5 dialer bailout set to %d", dialattempts)
}
// SetDispenseMiddleware will add a function that sits within the dialing process of the mysteryDialer and anyhing using it.
@ -102,70 +119,94 @@ func (p5 *ProxyEngine) SetDispenseMiddleware(f func(*Proxy) (*Proxy, bool)) {
p5.mu.Lock()
p5.dispenseMiddleware = f
p5.mu.Unlock()
p5.DebugLogger.Printf("prox5 dispense middleware set")
}
// SetDebugLogger sets the debug logger for the ProxyEngine. See the Logger interface for implementation details.
//
// Deprecated: use SetLogger instead. This will be removed in a future version.
func (p5 *ProxyEngine) SetDebugLogger(l logger.Logger) {
p5.SetLogger(l)
}
// SetLogger sets the debug logger for the ProxyEngine. See the Logger interface for implementation details.
func (p5 *ProxyEngine) SetLogger(l logger.Logger) {
debugHardLock.Lock()
p5.mu.Lock()
p5.DebugLogger = l
p5.mu.Unlock()
debugHardLock.Unlock()
p5.DebugLogger.Printf("prox5 debug logger set")
}
func (p5 *ProxyEngine) SetAndEnableDebugLogger(l logger.Logger) {
p5.SetLogger(l)
p5.EnableDebug()
}
// EnableAutoScaler enables the autoscaler.
// This will automatically scale up the number of workers based on the threshold of dial attempts versus validated proxies.
func (p5 *ProxyEngine) EnableAutoScaler() {
p5.scaler.Enable()
p5.DebugLogger.Printf("prox5 autoscaler enabled")
}
// DisableAutoScaler disables the autoscaler.
func (p5 *ProxyEngine) DisableAutoScaler() {
p5.scaler.Disable()
p5.DebugLogger.Printf("prox5 autoscaler disabled")
}
// SetAutoScalerMaxScale sets the relative maximum amount that the autoscaler will scale up.
func (p5 *ProxyEngine) SetAutoScalerMaxScale(max int) {
p5.scaler.SetMax(max)
p5.DebugLogger.Printf("prox5 autoscaler max scale set to %d", max)
}
// SetAutoScalerThreshold sets the threshold of validated proxies versus dials that will trigger the autoscaler.
func (p5 *ProxyEngine) SetAutoScalerThreshold(threshold int) {
p5.scaler.SetThreshold(threshold)
p5.DebugLogger.Printf("prox5 autoscaler threshold set to %d", threshold)
}
func (p5 *ProxyEngine) EnableDebugRedaction() {
p5.opt.Lock()
p5.opt.redact = true
p5.opt.Unlock()
p5.DebugLogger.Printf("[redacted]")
}
func (p5 *ProxyEngine) DisableDebugRedaction() {
p5.opt.Lock()
p5.opt.redact = false
p5.opt.Unlock()
p5.DebugLogger.Printf("prox5 redaction disabled")
}
func (p5 *ProxyEngine) EnableRecyclerShuffling() {
p5.opt.Lock()
p5.opt.shuffle = true
p5.opt.Unlock()
p5.DebugLogger.Printf("prox5 recycler shuffling enabled")
}
func (p5 *ProxyEngine) DisableRecyclerShuffling() {
p5.opt.Lock()
p5.opt.shuffle = false
p5.opt.Unlock()
p5.DebugLogger.Printf("prox5 recycler shuffling disabled")
}
func (p5 *ProxyEngine) EnableHTTPClientTLSVerification() {
p5.opt.Lock()
p5.opt.tlsVerify = true
p5.opt.Unlock()
p5.DebugLogger.Printf("prox5 HTTP client TLS verification enabled")
}
func (p5 *ProxyEngine) DisableHTTPClientTLSVerification() {
p5.opt.Lock()
p5.opt.tlsVerify = false
p5.opt.Unlock()
p5.DebugLogger.Printf("prox5 HTTP client TLS verification disabled")
}

View File

@ -154,6 +154,19 @@ func (hmd *handMeDown) Dial(network, addr string) (c net.Conn, err error) {
return hmd.conn, nil
}
func (p5 *ProxyEngine) announceValidating(sock *Proxy, presplit string) {
if sock == nil {
return
}
s := strs.Get()
s.MustWriteString("validating ")
s.MustWriteString(sock.GetProto().String())
s.MustWriteString("://")
s.MustWriteString(presplit)
p5.dbgPrint(s)
}
func (p5 *ProxyEngine) singleProxyCheck(sock *Proxy, protocol ProxyProtocol) error {
defer p5.anothaOne()
split := strings.Split(sock.Endpoint, "@")
@ -161,6 +174,9 @@ func (p5 *ProxyEngine) singleProxyCheck(sock *Proxy, protocol ProxyProtocol) err
if len(split) == 2 {
endpoint = split[1]
}
// p5.announceValidating(sock, endpoint)
conn, err := net.DialTimeout("tcp", endpoint, p5.GetValidationTimeout())
if err != nil {
return err
@ -185,6 +201,9 @@ func (p5 *ProxyEngine) singleProxyCheck(sock *Proxy, protocol ProxyProtocol) err
}
func (sock *Proxy) validate() {
if sock == nil || sock.parent == nil {
return
}
if !atomic.CompareAndSwapUint32(&sock.lock, stateUnlocked, stateLocked) {
return
}