Fix: pause/resume, Finish: authed proxies

This commit is contained in:
kayos@tcp.direct 2021-10-23 10:25:28 -07:00
parent 0bf2e0f711
commit 1679141997
10 changed files with 181 additions and 167 deletions

View File

@ -5,7 +5,6 @@ import "errors"
// SwampStatus represents the current state of our Swamp.
type SwampStatus uint32
const (
// Running means the proxy pool is currently taking in proxys and validating them, and is available to dispense proxies.
Running SwampStatus = iota
@ -17,19 +16,14 @@ const (
// Start starts our proxy pool operations. Trying to start a running Swamp will return an error.
func (s *Swamp) Start() error {
s.mu.RLock()
if s.Status != New {
s.mu.RUnlock()
if s.Status.Load().(SwampStatus) != New {
return errors.New("this swamp is not new, use resume if it is paused")
}
s.mu.RUnlock()
// mapBuilder builds deduplicated map with valid ips and ports
go s.mapBuilder()
// tossUp feeds jobs to pond continuously
go s.jobSpawner()
s.mu.Lock()
s.Status = Running
s.mu.Unlock()
s.runningdaemons.Store(0)
s.getThisDread()
return nil
}
@ -41,34 +35,55 @@ Pause will cease the creation of any new proxy validation operations.
* Pausing an already paused Swamp is a nonop.
*/
func (s *Swamp) Pause() error {
if s.IsRunning() {
return errors.New("already paused")
if !s.IsRunning() {
return errors.New("not running")
}
s.dbgPrint("pausing...")
var svcbuf = make(chan bool, s.svcStatus())
for s.svcStatus() != 0 {
svcbuf <- true
select {
case q := <- svcbuf:
s.quit <- q
default:
var svcbuf = make(chan bool, 2)
for {
select {
case svcbuf <- true:
//
default:
break
}
select {
case <-svcbuf:
go s.svcDown()
default:
break
}
if !s.IsRunning() {
break
}
}
s.Status = Paused
s.Status.Store(Paused)
return nil
}
func (s *Swamp) getThisDread() {
go s.mapBuilder()
<-s.conductor
go s.jobSpawner()
for {
if s.IsRunning() {
s.Status.Store(Running)
break
}
}
}
// Resume will resume pause proxy pool operations, attempting to resume a running Swamp is returns an error.
func (s *Swamp) Resume() error {
s.conductor = make(chan bool, 2)
if s.IsRunning() || s.Status == New {
if s.IsRunning() {
return errors.New("not paused")
}
go s.mapBuilder()
go s.jobSpawner()
<-s.conductor
s.getThisDread()
return nil
}

View File

@ -5,25 +5,16 @@ import (
"strconv"
"sync"
"sync/atomic"
"time"
)
func (s *Swamp) svcUp() {
s.mu.Lock()
s.runningdaemons++
s.conductor <- true
s.mu.Unlock()
s.runningdaemons.Store(s.runningdaemons.Load().(int) + 1)
}
func (s *Swamp) svcDown() {
s.mu.Lock()
s.runningdaemons--
s.mu.Unlock()
}
func (s *Swamp) svcStatus() int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.runningdaemons
s.quit <- true
s.runningdaemons.Store(s.runningdaemons.Load().(int) - 1)
}
type swampMap struct {
@ -40,15 +31,14 @@ func (sm swampMap) add(sock string) (*Proxy, bool) {
return nil, false
}
p := &Proxy{
sm.plot[sock] = &Proxy{
Endpoint: sock,
lock: stateUnlocked,
parent: sm.parent,
}
p.timesValidated.Store(0)
p.timesBad.Store(0)
sm.plot[sock] = p
sm.plot[sock].timesValidated.Store(0)
sm.plot[sock].timesBad.Store(0)
return sm.plot[sock], true
}
@ -62,6 +52,7 @@ func (sm swampMap) exists(sock string) bool {
func (sm swampMap) delete(sock string) error {
sm.mu.Lock()
defer sm.mu.Unlock()
if !sm.exists(sock) {
return errors.New("proxy does not exist in map")
}
@ -69,7 +60,7 @@ func (sm swampMap) delete(sock string) error {
randSleep()
}
sm.plot[sock]=nil
sm.plot[sock] = nil
delete(sm.plot, sock)
return nil
}
@ -77,6 +68,7 @@ func (sm swampMap) delete(sock string) error {
func (sm swampMap) clear() {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.plot = make(map[string]*Proxy)
}
@ -84,41 +76,37 @@ func (s *Swamp) mapBuilder() {
var filtered string
var ok bool
s.svcUp()
defer func() {
s.svcDown()
s.dbgPrint("map builder paused")
}()
defer s.dbgPrint("map builder paused")
s.dbgPrint("map builder started")
for {
select {
case in := <-inChan:
if filtered, ok = filter(in); !ok {
continue
go func() {
for {
select {
case in := <-inChan:
if filtered, ok = filter(in); !ok {
continue
}
if p, ok := s.swampmap.add(filtered); !ok {
continue
} else {
s.Pending <- p
}
default:
//
}
if p, ok := s.swampmap.add(filtered); !ok {
continue
} else {
s.Pending <- p
}
case <-s.quit:
return
default:
//
}
}
}()
s.conductor <- true
s.svcUp()
<-s.quit
}
func (s *Swamp) recycling() int {
if !s.GetRecyclingStatus() {
return 0
}
s.mu.RLock()
s.swampmap.mu.RLock()
defer s.mu.RUnlock()
defer s.swampmap.mu.RUnlock()
if len(s.swampmap.plot) < 1 {
return 0
@ -138,26 +126,25 @@ func (s *Swamp) recycling() int {
}
func (s *Swamp) jobSpawner() {
s.svcUp()
s.dbgPrint("job spawner started")
defer func() {
s.svcDown()
s.dbgPrint("job spawner paused")
}()
for {
if s.Status == Paused {
return
}
select {
case sock := <-s.Pending:
if err := s.pool.Submit(sock.validate); err != nil {
s.dbgPrint(ylw + err.Error() + rst)
defer s.dbgPrint("map builder paused")
go func() {
for {
select {
case sock := <-s.Pending:
if err := s.pool.Submit(sock.validate); err != nil {
s.dbgPrint(ylw + err.Error() + rst)
}
default:
time.Sleep(1 * time.Second)
count := s.recycling()
s.dbgPrint(ylw + "recycled " + strconv.Itoa(count) + " proxies from our map" + rst)
}
case <-s.quit:
return
default:
count := s.recycling()
s.dbgPrint(ylw + "recycled " + strconv.Itoa(count) + " proxies from our map" + rst)
}
}
}()
s.svcUp()
<-s.quit
}

View File

@ -2,30 +2,16 @@ package Prox5
import (
"sync"
rate5 "github.com/yunginnanet/Rate5"
)
var (
useDebugChannel = false
debugChan chan string
debugMutex *sync.RWMutex
debugRatelimit *rate5.Limiter
)
type debugLine struct {
s string
}
// UniqueKey implements rate5's Identity interface.
// https://pkg.go.dev/github.com/yunginnanet/Rate5#Identity
func (dbg debugLine) UniqueKey() string {
return dbg.s
}
func init() {
debugMutex = &sync.RWMutex{}
debugRatelimit = rate5.NewStrictLimiter(120, 2)
}
// DebugChannel will return a channel which will receive debug messages once debug is enabled.
@ -66,10 +52,6 @@ func (s *Swamp) dbgPrint(str string) {
return
}
if debugRatelimit.Check(debugLine{s: str}) {
return
}
if useDebugChannel {
select {
case debugChan <- str:

28
defs.go
View File

@ -26,7 +26,7 @@ type Swamp struct {
// Stats holds the Statistics for our swamp
Stats *Statistics
Status SwampStatus
Status atomic.Value
// Pending is a constant stream of proxy strings to be verified
Pending chan *Proxy
@ -43,11 +43,11 @@ type Swamp struct {
reaper sync.Pool
mu *sync.RWMutex
pool *ants.Pool
swampopt *swampOptions
runningdaemons int
runningdaemons atomic.Value
conductor chan bool
mu *sync.RWMutex
}
var (
@ -95,7 +95,7 @@ func defOpt() *swampOptions {
sm.dialerBailout.Store(defBailout)
sm.stale.Store(defaultStaleTime)
sm.maxWorkers.Store(defWorkers)
sm.maxWorkers = defWorkers
return sm
}
@ -126,14 +126,19 @@ type swampOptions struct {
// stale is the amount of time since verification that qualifies a proxy going stale.
// if a stale proxy is drawn during the use of our getter functions, it will be skipped.
stale atomic.Value
// userAgents contains a list of userAgents to be randomly drawn from for proxied requests, this should be supplied via SetUserAgents
userAgents []string
// debug when enabled will print results as they come in
debug atomic.Value
// checkEndpoints includes web services that respond with (just) the WAN IP of the connection for validation purposes
checkEndpoints []string
// maxWorkers determines the maximum amount of workers used for checking proxies
maxWorkers atomic.Value
maxWorkers int
// validationTimeout defines the timeout for proxy validation operations.
// This will apply for both the initial quick check (dial), and the second check (HTTP GET).
validationTimeout atomic.Value
@ -170,8 +175,8 @@ type Proxy struct {
// timesBad is the amount of times the proxy has been marked as bad.
timesBad atomic.Value
parent *Swamp
lock uint32
parent *Swamp
lock uint32
hardlock *sync.Mutex
}
@ -202,10 +207,13 @@ func NewDefaultSwamp() *Swamp {
swampopt: defOpt(),
quit: make(chan bool),
conductor: make(chan bool),
mu: &sync.RWMutex{},
Status: New,
Status: atomic.Value{},
}
s.Status.Store(New)
s.swampmap = swampMap{
plot: make(map[string]*Proxy),
mu: &sync.RWMutex{},
@ -214,11 +222,13 @@ func NewDefaultSwamp() *Swamp {
s.socksServerLogger = socksLogger{parent: s}
s.runningdaemons.Store(0)
s.useProx = rl.NewCustomLimiter(s.swampopt.useProxConfig)
s.badProx = rl.NewCustomLimiter(s.swampopt.badProxConfig)
var err error
s.pool, err = ants.NewPool(s.swampopt.maxWorkers.Load().(int), ants.WithOptions(ants.Options{
s.pool, err = ants.NewPool(s.swampopt.maxWorkers, ants.WithOptions(ants.Options{
ExpiryDuration: 2 * time.Minute,
PanicHandler: s.pondPanic,
}))

View File

@ -3,6 +3,7 @@ package main
import (
"fmt"
"os"
"strconv"
"time"
"github.com/mattn/go-tty"
@ -16,9 +17,8 @@ var quit chan bool
func init() {
quit = make(chan bool)
swamp = Prox5.NewDefaultSwamp()
if err := swamp.SetMaxWorkers(5000); err != nil {
panic(err)
}
swamp.SetMaxWorkers(5)
swamp.EnableDebug()
count := swamp.LoadProxyTXT("socks.list")
if count < 1 {
@ -54,12 +54,7 @@ func watchKeyPresses() {
if err != nil {
panic(err)
}
defer func(t *tty.TTY) {
err := t.Close()
if err != nil {
panic(err)
}
}(t)
var done = false
for {
r, err := t.ReadRune()
@ -75,6 +70,12 @@ func watchKeyPresses() {
println("enabling debug")
swamp.EnableDebug()
}
case "+":
swamp.SetMaxWorkers(swamp.GetMaxWorkers() + 1)
println("New worker count: " + strconv.Itoa(swamp.GetMaxWorkers()))
case "-":
swamp.SetMaxWorkers(swamp.GetMaxWorkers() - 1)
println("New worker count: " + strconv.Itoa(swamp.GetMaxWorkers()))
case "a":
go get("4")
case "b":
@ -82,7 +83,7 @@ func watchKeyPresses() {
case "c":
go get("5")
case "p":
if swamp.Status == 0 {
if swamp.IsRunning() {
err := swamp.Pause()
if err != nil {
println(err.Error())
@ -93,23 +94,30 @@ func watchKeyPresses() {
}
}
case "q":
quit <- true
done = true
break
default:
time.Sleep(25 * time.Millisecond)
//
}
if done {
break
}
}
t.Close()
quit <- true
return
}
func main() {
go watchKeyPresses()
for {
select {
case <-quit:
return
default:
go func() {
for {
fmt.Printf("4: %d, 4a: %d, 5: %d \n", swamp.Stats.Valid4, swamp.Stats.Valid4a, swamp.Stats.Valid5)
time.Sleep(1 * time.Second)
time.Sleep(5 * time.Second)
}
}
}()
<-quit
}

View File

@ -42,15 +42,16 @@ func (s *Swamp) GetTimeoutSecondsStr() string {
// GetMaxWorkers returns maximum amount of workers that validate proxies concurrently. Note this is read-only during runtime.
func (s *Swamp) GetMaxWorkers() int {
return s.swampopt.maxWorkers.Load().(int)
return s.pool.Cap()
}
// IsRunning returns true if our background goroutines defined in daemons.go are currently operational
func (s *Swamp) IsRunning() bool {
if s.runningdaemons == 2 {
return true
if s.runningdaemons.Load() == nil {
println("nil")
return false
}
return false
return s.runningdaemons.Load().(int) > 0
}
// GetRecyclingStatus retrieves the current recycling status, see EnableRecycling.

1
go.mod
View File

@ -5,6 +5,7 @@ go 1.17
require (
git.tcp.direct/kayos/go-socks5 v1.0.1
github.com/mattn/go-tty v0.0.3
github.com/miekg/dns v1.1.43
github.com/panjf2000/ants/v2 v2.4.6
github.com/yunginnanet/Rate5 v0.4.3
golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b

View File

@ -6,7 +6,9 @@ import (
"os"
"strconv"
"strings"
"sync/atomic"
"github.com/miekg/dns"
ipa "inet.af/netaddr"
)
@ -15,7 +17,7 @@ import (
var inChan chan string
func init() {
inChan = make(chan string, 1000000)
inChan = make(chan string, 100000)
}
func filter(in string) (filtered string, ok bool) {
@ -33,7 +35,10 @@ func filter(in string) (filtered string, ok bool) {
return in, false
}
return combo.String(), true
case 3:
case 4:
if _, ok := dns.IsDomainName(split[0]); ok {
return fmt.Sprintf("%s:%s@%s:%s", split[2], split[3], split[0], split[1]), true
}
combo, err := ipa.ParseIPPort(split[0] + ":" + split[1])
if err != nil {
return in, false
@ -69,32 +74,37 @@ func filter(in string) (filtered string, ok bool) {
// LoadProxyTXT loads proxies from a given seed file and feeds them to the mapBuilder to be later queued automatically for validation.
func (s *Swamp) LoadProxyTXT(seedFile string) int {
var count int
var filtered string
var count = &atomic.Value{}
count.Store(0)
var ok bool
s.dbgPrint("LoadProxyTXT start: " + seedFile)
defer s.dbgPrint("LoadProxyTXT finished: " + strconv.Itoa(count))
defer func() {
s.dbgPrint("LoadProxyTXT finished: " + strconv.Itoa(count.Load().(int)))
}()
f, err := os.Open(seedFile)
if err != nil {
s.dbgPrint(red + err.Error() + rst)
return 0
}
scan := bufio.NewScanner(f)
for scan.Scan() {
if filtered, ok = filter(scan.Text()); !ok {
if _, ok = filter(scan.Text()); !ok {
continue
}
go s.LoadSingleProxy(filtered)
count++
count.Store(count.Load().(int)+1)
go s.LoadSingleProxy(scan.Text())
}
if err := f.Close(); err != nil {
s.dbgPrint(err.Error())
}
return count
return count.Load().(int)
}
// LoadSingleProxy loads a SOCKS proxy into our map. Uses the format: 127.0.0.1:1080 (host:port).

View File

@ -43,9 +43,8 @@ func (s *Swamp) SetValidationTimeout(timeout time.Duration) {
}
// SetMaxWorkers set the maximum workers for proxy checking and clears the current proxy map and worker pool jobs.
func (s *Swamp) SetMaxWorkers(num int) error {
func (s *Swamp) SetMaxWorkers(num int) {
s.pool.Tune(num)
return nil
}
// EnableRecycling disables recycling used proxies back into the pending channel for revalidation after dispensed.

View File

@ -4,10 +4,12 @@ import (
"bytes"
"crypto/tls"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"strings"
// "net/url"
"sync/atomic"
"time"
@ -61,19 +63,13 @@ func (s *Swamp) checkHTTP(sock *Proxy) (string, error) {
return "", err
}
var t int
switch sock.Proto.Load().(string) {
case "5":
t = socks.SOCKS5
case "4":
t = socks.SOCKS4
case "4a":
t = socks.SOCKS4A
default:
return "", errors.New("invalid protocol loaded")
}
var dialSocks = socks.Dial(fmt.Sprintf(
"socks%s://%s/?timeout=%ss",
sock.Proto.Load().(string),
sock.Endpoint,
s.GetTimeoutSecondsStr()),
)
var dialSocks = socks.DialSocksProxy(t, sock.Endpoint)
var transportDialer = dialSocks
if sock.Proto.Load().(string) == "none" {
transportDialer = proxy.Direct.Dial
@ -117,10 +113,13 @@ func (s *Swamp) anothaOne() {
func (s *Swamp) singleProxyCheck(sock *Proxy) error {
defer s.anothaOne()
if _, err := net.DialTimeout("tcp", sock.Endpoint,
split := strings.Split(sock.Endpoint, "@")
endpoint := split[0]
if len(split) == 2 {
endpoint = split[1]
}
if _, err := net.DialTimeout("tcp", endpoint,
s.swampopt.validationTimeout.Load().(time.Duration)); err != nil {
s.badProx.Check(sock)
return err
}
@ -146,7 +145,7 @@ func (sock *Proxy) validate() {
s := sock.parent
if s.useProx.Check(sock) {
s.dbgPrint(ylw + "useProx ratelimited: " + sock.Endpoint + rst)
// s.dbgPrint(ylw + "useProx ratelimited: " + sock.Endpoint + rst)
atomic.StoreUint32(&sock.lock, stateUnlocked)
return
}
@ -161,7 +160,7 @@ func (sock *Proxy) validate() {
// try to use the proxy with all 3 SOCKS versions
var good = false
for _, sver := range sversions {
if s.Status == Paused {
if s.Status.Load().(SwampStatus) == Paused {
return
}
@ -174,11 +173,13 @@ func (sock *Proxy) validate() {
// }
good = true
break
} else {
s.dbgPrint(err.Error())
}
}
if !good {
s.dbgPrint(red + "failed to verify: " + sock.Endpoint)
s.dbgPrint(red + "failed to verify: " + sock.Endpoint + rst)
sock.bad()
s.badProx.Check(sock)
atomic.StoreUint32(&sock.lock, stateUnlocked)