This commit is contained in:
kayos@tcp.direct 2021-09-19 18:23:18 -07:00
parent 1f85f7d01e
commit 71c98159e7
7 changed files with 151 additions and 99 deletions

View File

@ -4,16 +4,15 @@ import "errors"
// Start starts our proxy pool operations. Trying to start a running Swamp is a nonop.
func (s *Swamp) Start() error {
if len(s.scvm) < 1 {
return errors.New("there are no proxies in the list")
if s.started {
return errors.New("already running")
}
if !s.started {
go s.tossUp()
go s.feed()
s.started = true
return nil
}
return errors.New("already running")
// mapBuilder builds deduplicated map with valid ips and ports
go s.mapBuilder()
// tossUp feeds jobs to pond continuously
go s.jobSpawner()
s.started = true
return nil
}
/*
@ -35,14 +34,11 @@ func (s *Swamp) Pause() {
// Resume will resume pause proxy pool operations, attempting to resume a running Swamp is a non-op.
func (s *Swamp) Resume() error {
if s.Status == Running {
return nil
}
if len(s.scvm) < 1 {
return errors.New("there are no proxies in the list")
if s.IsRunning() {
return errors.New("already running")
}
s.Status = Running
go s.feed()
go s.tossUp()
go s.mapBuilder()
go s.jobSpawner()
return nil
}

64
daemons.go Normal file
View File

@ -0,0 +1,64 @@
package pxndscvm
import "time"
func (s *Swamp) svcUp() {
s.mu.Lock()
s.runningdaemons++
s.mu.Unlock()
}
func (s *Swamp) svcDown() {
s.mu.Lock()
s.runningdaemons--
s.mu.Unlock()
}
func (s *Swamp) mapBuilder() {
s.svcUp()
s.dbgPrint("map builder started")
defer func() {
s.svcDown()
s.dbgPrint("map builder paused")
}()
for {
select {
case in := <-inChan:
if !s.stage1(in) {
continue
}
s.mu.Lock()
s.swampmap[in] = &Proxy{
Endpoint: in,
}
s.mu.Unlock()
case <- s.quit:
return
default:
time.Sleep(10 * time.Millisecond)
}
}
}
func (s *Swamp) jobSpawner() {
s.svcUp()
s.dbgPrint("job spawner started")
defer func() {
s.svcDown()
s.dbgPrint("job spawner paused")
}()
for {
if s.Status == Paused {
return
}
select {
case <-s.quit:
return
default:
go s.pool.Submit(s.validate)
time.Sleep(time.Duration(10) * time.Millisecond)
}
}
}

View File

@ -62,7 +62,9 @@ func (s *Swamp) dbgPrint(str string) {
return
}
if useDebugChannel {
debugChan <- str
go func() {
debugChan <- str
}()
return
}
println("pxndscvm: " + str)

17
defs.go
View File

@ -40,12 +40,15 @@ type Swamp struct {
useProx *rl.Limiter
badProx *rl.Limiter
quit chan bool
scvm []string
pool *pond.WorkerPool
swampopt *swampOptions
started bool
mu *sync.RWMutex
quit chan bool
swampmap map[string]*Proxy
pool *pond.WorkerPool
swampopt *swampOptions
runningdaemons int
started bool
mu *sync.RWMutex
}
var (
@ -153,6 +156,8 @@ func NewDefaultSwamp() *Swamp {
mu: &sync.RWMutex{},
}
s.swampmap = make(map[string]*Proxy)
s.useProx = rl.NewCustomLimiter(s.swampopt.useProxConfig)
s.badProx = rl.NewCustomLimiter(s.swampopt.badProxConfig)

View File

@ -2,20 +2,9 @@ package pxndscvm
import (
"net"
"sync/atomic"
"time"
)
const (
stateUnlocked uint32 = iota
stateLocked
)
var dialPrioritySpinlock uint32
func init() {
atomic.StoreUint32(&dialPrioritySpinlock, stateUnlocked)
}
// Socks5Str gets a SOCKS5 proxy that we have fully verified (dialed and then retrieved our IP address from a what-is-my-ip endpoint.
// Will block if one is not available!
@ -93,6 +82,7 @@ func (s *Swamp) GetAnySOCKS() Proxy {
}
func (s *Swamp) stillGood(candidate Proxy) bool {
if s.badProx.Peek(candidate) {
s.dbgPrint(ylw + "badProx dial ratelimited: " + candidate.Endpoint + rst)
return false
@ -108,6 +98,7 @@ func (s *Swamp) stillGood(candidate Proxy) bool {
go s.Stats.stale()
return false
}
return true
}
@ -145,5 +136,14 @@ func (s *Swamp) GetMaxWorkers() int {
defer s.mu.RUnlock()
return s.swampopt.maxWorkers
}
// IsRunning returns true if our background goroutines defined in daemons.go are currently operational
func (s *Swamp) IsRunning() bool {
if s.runningdaemons == 2 {
return true
}
return false
}
// TODO: Implement ways to access worker pool (pond) statistics

View File

@ -2,7 +2,6 @@ package pxndscvm
import (
"bufio"
"errors"
"os"
"strconv"
"strings"
@ -10,61 +9,81 @@ import (
ipa "inet.af/netaddr"
)
// throw shit proxies here, get map
var inChan chan string
func init() {
inChan = make(chan string, 100)
}
func (s *Swamp) stage1(in string) bool {
if !strings.Contains(in, ":") {
return false
}
split := strings.Split(in, ":")
if _, err := ipa.ParseIP(split[0]); err != nil {
return false
}
if _, err := strconv.Atoi(in); err != nil {
return false
}
s.mu.RLock()
if _, ok := s.swampmap[in]; ok {
s.mu.RUnlock()
return false
}
s.mu.RUnlock()
return true
}
// LoadProxyTXT loads proxies from a given seed file and randomly feeds them to the workers.
func (s *Swamp) LoadProxyTXT(seedFile string) error {
s.dbgPrint("LoadProxyTXT start")
func (s *Swamp) LoadProxyTXT(seedFile string) int {
var count int
s.dbgPrint("LoadProxyTXT start: "+seedFile)
defer s.dbgPrint("LoadProxyTXT finished: " + strconv.Itoa(count))
f, err := os.Open(seedFile)
if err != nil {
return err
panic(err)
return 0
}
scan := bufio.NewScanner(f)
for scan.Scan() {
s.mu.Lock()
s.scvm = append(s.scvm, scan.Text())
s.mu.Unlock()
if !s.stage1(scan.Text()) {
continue
}
go s.LoadSingleProxy(scan.Text())
count++
}
if err := f.Close(); err != nil {
s.dbgPrint(err.Error())
return err
return count
}
return nil
return count
}
// LoadSingleProxy loads a SOCKS proxy into our queue as the format: 127.0.0.1:1080 (host:port)
func (s *Swamp) LoadSingleProxy(sock string) error {
if !strings.Contains(sock, ":") {
return errors.New("missing colon/missing port")
}
split := strings.Split(sock, ":")
if _, err := ipa.ParseIP(split[0]); err != nil {
return errors.New(split[0] + "is not an IP address")
}
if _, err := strconv.Atoi(split[1]); err != nil {
return errors.New(split[1] + "is not a number")
}
s.mu.Lock()
s.scvm = append(s.scvm, sock)
s.mu.Unlock()
return nil
func (s *Swamp) LoadSingleProxy(sock string) {
inChan <- sock
}
// LoadMultiLineString loads a multiine string object with one (host:port) SOCKS proxy per line
func (s *Swamp) LoadMultiLineString(socks string) (int, error) {
func (s *Swamp) LoadMultiLineString(socks string) int {
var count int
scan := bufio.NewScanner(strings.NewReader(socks))
for scan.Scan() {
if err := s.LoadSingleProxy(scan.Text()); err == nil {
count++
}
go s.LoadSingleProxy(scan.Text())
count++
}
if count < 1 {
return 0, errors.New("no valid host:ip entries found in string")
return 0
}
return count, nil
return count
}
// ClearSOCKSList clears the slice of proxies that we continually draw from at random for validation
@ -72,5 +91,5 @@ func (s *Swamp) LoadMultiLineString(socks string) (int, error) {
func (s *Swamp) ClearSOCKSList() {
s.mu.Lock()
defer s.mu.Unlock()
s.scvm = []string{}
s.swampmap = make(map[string]*Proxy)
}

View File

@ -15,23 +15,6 @@ import (
"h12.io/socks"
)
func (s *Swamp) feed() {
s.dbgPrint("swamp feed start")
for {
if s.Status == Paused {
return
}
select {
case s.Pending <- randStrChoice(s.scvm):
//
case <-s.quit:
s.dbgPrint("feed() paused")
return
default:
time.Sleep(1 * time.Second)
}
}
}
func (s *Swamp) checkHTTP(sock Proxy) (string, error) {
req, err := http.NewRequest("GET", s.GetRandomEndpoint(), bytes.NewBuffer([]byte("")))
@ -168,20 +151,3 @@ func (s *Swamp) validate() {
}
}
}
func (s *Swamp) tossUp() {
s.dbgPrint("tossUp() proxy checking loop start")
for {
if s.Status == Paused {
return
}
select {
case <-s.quit:
s.dbgPrint("tossUp() paused")
return
default:
go s.pool.Submit(s.validate)
time.Sleep(time.Duration(10) * time.Millisecond)
}
}
}