Major but non-breaking: from pond to ants

这个提交包含在:
kayos@tcp.direct 2021-09-23 00:38:00 -07:00
父节点 81df33fd6e
当前提交 7ed5933b11
共有 11 个文件被更改,包括 120 次插入63 次删除

查看文件

@ -2,16 +2,30 @@ package pxndscvm
import "errors"
// SwampStatus represents the current state of our Swamp.
type SwampStatus uint32
const (
// Running means the proxy pool is currently taking in proxys and validating them, and is available to dispense proxies.
Running SwampStatus = iota
// Paused means the proxy pool has been with Swamp.Pause() and may be resumed with Swamp.Resume()
Paused
// New means the proxy pool has never been started.
New
)
// Start starts our proxy pool operations. Trying to start a running Swamp will return an error.
func (s *Swamp) Start() error {
if s.started {
return errors.New("already running")
s.mu.RLock()
if s.Status != New {
s.mu.RUnlock()
return errors.New("this swamp is not new, use resume if it is paused")
}
s.mu.RUnlock()
// mapBuilder builds deduplicated map with valid ips and ports
go s.mapBuilder()
// tossUp feeds jobs to pond continuously
go s.jobSpawner()
s.started = true
s.mu.Lock()
s.Status = Running
s.mu.Unlock()
@ -26,22 +40,24 @@ Pause will cease the creation of any new proxy validation operations.
* Pausing an already paused Swamp is a nonop.
*/
func (s *Swamp) Pause() error {
if s.Status == Paused {
if s.IsRunning() {
return errors.New("already paused")
}
for n := 2; n > 0; n-- {
s.mu.RLock()
for n := s.runningdaemons; n > 0; n-- {
s.quit <- true
}
s.mu.RUnlock()
s.Status = Paused
return nil
}
// Resume will resume pause proxy pool operations, attempting to resume a running Swamp is a non-op.
func (s *Swamp) Resume() error {
if s.Status != Paused {
if !s.IsRunning() && s.Status != New {
return errors.New("not paused")
}
s.Status = Running
go s.mapBuilder()
go s.jobSpawner()
return nil

查看文件

@ -2,6 +2,7 @@ package pxndscvm
import (
"errors"
"strconv"
"sync"
"time"
)
@ -61,6 +62,8 @@ func (sm swampMap) clear() {
}
func (s *Swamp) mapBuilder() {
var filtered string
var ok bool
s.svcUp()
s.dbgPrint("map builder started")
defer func() {
@ -70,10 +73,10 @@ func (s *Swamp) mapBuilder() {
for {
select {
case in := <-inChan:
if !s.stage1(in) {
if filtered, ok = s.stage1(in); !ok {
continue
}
if p, ok := s.swampmap.add(in); !ok {
if p, ok := s.swampmap.add(filtered); !ok {
continue
} else {
s.Pending <- p
@ -81,11 +84,33 @@ func (s *Swamp) mapBuilder() {
case <-s.quit:
return
default:
time.Sleep(100 * time.Millisecond)
count := s.recycling()
s.dbgPrint(ylw + "recycled " + strconv.Itoa(count) + " proxies from our map" + rst)
}
}
}
func (s *Swamp) recycling() int {
if !s.GetRecyclingStatus() {
return 0
}
s.mu.RLock()
defer s.mu.RUnlock()
if len(s.swampmap.plot) < 1 {
return 0
}
var count int
for _, sock := range s.swampmap.plot {
select {
case s.Pending <- sock:
count++
default:
continue
}
}
return count
}
func (s *Swamp) jobSpawner() {
s.svcUp()
s.dbgPrint("job spawner started")
@ -101,10 +126,12 @@ func (s *Swamp) jobSpawner() {
case <-s.quit:
return
case sock := <-s.Pending:
go s.pool.Submit(sock.validate)
if err := s.pool.Submit(sock.validate); err != nil {
s.dbgPrint(ylw+err.Error()+rst)
}
time.Sleep(time.Duration(10) * time.Millisecond)
default:
time.Sleep(100 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
}
}
}

查看文件

@ -2,16 +2,30 @@ package pxndscvm
import (
"sync"
rate5 "github.com/yunginnanet/Rate5"
)
var (
useDebugChannel = false
debugChan chan string
debugMutex *sync.RWMutex
debugRatelimit *rate5.Limiter
)
type debugLine struct {
s string
}
// UniqueKey implements rate5's Identity interface.
// https://pkg.go.dev/github.com/yunginnanet/Rate5#Identity
func (dbg debugLine) UniqueKey() string {
return dbg.s
}
func init() {
debugMutex = &sync.RWMutex{}
debugRatelimit = rate5.NewStrictLimiter(120, 2)
}
// DebugChannel will return a channel which will receive debug messages once debug is enabled.
@ -55,6 +69,11 @@ func (s *Swamp) dbgPrint(str string) {
if !s.swampopt.debug {
return
}
if debugRatelimit.Check(debugLine{s: str}) {
return
}
if useDebugChannel {
go func() {
debugChan <- str

42
defs.go
查看文件

@ -5,20 +5,10 @@ import (
"sync"
"time"
"github.com/alitto/pond"
"github.com/panjf2000/ants/v2"
rl "github.com/yunginnanet/Rate5"
)
// SwampStatus represents the current state of our Swamp.
type SwampStatus int
const (
// Running means the proxy pool is currently taking in proxys and validating them, and is available to dispense proxies.
Running SwampStatus = iota
// Paused means the proxy pool has been with Swamp.Pause() and may be resumed with Swamp.Resume()
Paused
)
// Swamp represents a proxy pool
type Swamp struct {
// ValidSocks5 is a constant stream of verified ValidSocks5 proxies
@ -44,10 +34,9 @@ type Swamp struct {
swampmap swampMap
pool *pond.WorkerPool
pool *ants.Pool
swampopt *swampOptions
runningdaemons int
started bool
mu *sync.RWMutex
}
@ -88,7 +77,7 @@ func defOpt() *swampOptions {
useProxConfig: defUseProx,
badProxConfig: defBadProx,
removeafter: 10,
removeafter: 5,
recycle: true,
validationTimeout: 5,
@ -159,10 +148,10 @@ func (sock Proxy) UniqueKey() string {
// After calling this you can use the various "setters" to change the options before calling Swamp.Start().
func NewDefaultSwamp() *Swamp {
s := &Swamp{
ValidSocks5: make(chan *Proxy, 100000),
ValidSocks4: make(chan *Proxy, 100000),
ValidSocks4a: make(chan *Proxy, 100000),
Pending: make(chan *Proxy, 100000),
ValidSocks5: make(chan *Proxy, 1000000),
ValidSocks4: make(chan *Proxy, 1000000),
ValidSocks4a: make(chan *Proxy, 1000000),
Pending: make(chan *Proxy, 100000000),
Stats: &Statistics{
Valid4: 0,
@ -189,13 +178,26 @@ func NewDefaultSwamp() *Swamp {
s.useProx = rl.NewCustomLimiter(s.swampopt.useProxConfig)
s.badProx = rl.NewCustomLimiter(s.swampopt.badProxConfig)
s.pool = pond.New(s.swampopt.maxWorkers, 1000000, pond.PanicHandler(func(p interface{}) {
fmt.Println("WORKER PANIC! ", p)
var err error
s.pool, err = ants.NewPool(s.swampopt.maxWorkers, ants.WithOptions(ants.Options{
ExpiryDuration: 5 * time.Minute,
PreAlloc: true,
PanicHandler: s.pondPanic,
}))
if err != nil {
s.dbgPrint(red+"CRITICAL: "+err.Error()+rst)
panic(err)
}
return s
}
func (s *Swamp) pondPanic(p interface{}) {
fmt.Println("WORKER PANIC! ", p)
s.dbgPrint(red + "PANIC! " + fmt.Sprintf("%v", p))
}
// defaultUserAgents is a small list of user agents to use during validation.
var defaultUserAgents = []string{
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:60.0) Gecko/20100101 Firefox/60.0",

查看文件

@ -54,6 +54,14 @@ func (s *Swamp) GetRecyclingStatus() bool {
return s.swampopt.recycle
}
// GetWorkers retrieves pond worker statistics:
// * return MaxWorkers, RunningWorkers, IdleWorkers
func (s *Swamp) GetWorkers() (int, int, int) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.pool.Cap(), s.pool.Running(), s.pool.Free()
}
// GetRemoveAfter retrieves the removeafter policy, the amount of times a recycled proxy is marked as bad until it is removed entirely.
// * returns -1 if recycling is disabled.
func (s *Swamp) GetRemoveAfter() int {
@ -64,11 +72,3 @@ func (s *Swamp) GetRemoveAfter() int {
}
return s.swampopt.removeafter
}
// GetWorkers retrieves pond worker statistics:
// * return MaxWorkers, RunningWorkers, IdleWorkers
func (s *Swamp) GetWorkers() (int, int, int) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.pool.MaxWorkers(), s.pool.RunningWorkers(), s.pool.IdleWorkers()
}

2
go.mod
查看文件

@ -3,9 +3,9 @@ module git.tcp.direct/kayos/pxndscvm
go 1.17
require (
github.com/alitto/pond v1.5.1
github.com/h12w/go-socks5 v0.0.0-20200522160539-76189e178364
github.com/mattn/go-tty v0.0.3
github.com/panjf2000/ants/v2 v2.4.6
github.com/yunginnanet/Rate5 v0.0.0-20210918144058-dfa3665041eb
golang.org/x/net v0.0.0-20210908191846-a5e095526f91
h12.io/socks v1.0.3

查看文件

@ -2,6 +2,7 @@ package pxndscvm
import (
"bufio"
"fmt"
"os"
"strconv"
"strings"
@ -17,23 +18,25 @@ func init() {
inChan = make(chan string, 100000)
}
func (s *Swamp) stage1(in string) bool {
func (s *Swamp) stage1(in string) (string, bool) {
if !strings.Contains(in, ":") {
return false
return in, false
}
split := strings.Split(in, ":")
if _, err := ipa.ParseIP(split[0]); err != nil {
return false
return in, false
}
if _, err := strconv.Atoi(split[1]); err != nil {
return false
return in, false
}
return true
return fmt.Sprintf("%s:%s", split[0], split[1]), true
}
// LoadProxyTXT loads proxies from a given seed file and feeds them to the mapBuilder to be later queued automatically for validation.
func (s *Swamp) LoadProxyTXT(seedFile string) int {
var count int
var filtered string
var ok bool
s.dbgPrint("LoadProxyTXT start: " + seedFile)
defer s.dbgPrint("LoadProxyTXT finished: " + strconv.Itoa(count))
@ -45,10 +48,10 @@ func (s *Swamp) LoadProxyTXT(seedFile string) int {
scan := bufio.NewScanner(f)
for scan.Scan() {
if !s.stage1(scan.Text()) {
if filtered, ok = s.stage1(scan.Text()); !ok {
continue
}
go s.LoadSingleProxy(scan.Text())
go s.LoadSingleProxy(filtered)
count++
}

查看文件

@ -1,11 +1,7 @@
package pxndscvm
import (
"errors"
"fmt"
"time"
"github.com/alitto/pond"
)
// AddUserAgents appends to the list of useragents we randomly choose from during proxied requests
@ -53,17 +49,7 @@ func (s *Swamp) SetValidationTimeout(newtimeout int) {
// SetMaxWorkers set the maximum workers for proxy checking and clears the current proxy map and worker pool jobs.
func (s *Swamp) SetMaxWorkers(num int) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.Status == Running {
return errors.New("can't change max workers during proxypool operation, try pausing first")
}
s.pool.StopAndWait()
s.swampopt.maxWorkers = num
s.pool = pond.New(s.swampopt.maxWorkers, 1000000, pond.PanicHandler(func(p interface{}) {
fmt.Println("WORKER PANIC! ", p)
}))
s.swampmap.plot = make(map[string]*Proxy)
s.pool.Tune(num)
return nil
}

查看文件

@ -17,6 +17,8 @@ type Statistics struct {
Dispensed int
// Stale is the amount of proxies that failed our stale policy upon dispensing
Stale int
// Checked is the amount of proxies we've checked.
Checked int
// birthday represents the time we started checking proxies with this pool
birthday time.Time
mu *sync.Mutex

查看文件

@ -7,6 +7,7 @@ import (
const (
grn = "\033[32m"
red = "\033[31m"
ylw = "\033[33m"
rst = "\033[0m"
)

查看文件

@ -72,6 +72,7 @@ func (s *Swamp) checkHTTP(sock *Proxy) (string, error) {
}
func (s *Swamp) singleProxyCheck(sock *Proxy) error {
s.Stats.Checked++
if _, err := net.DialTimeout("tcp", sock.Endpoint, time.Duration(s.GetValidationTimeout())*time.Second); err != nil {
s.badProx.Check(sock)
return err