begin implementing authentication and fix insane channel buffer sizes

This commit is contained in:
kayos@tcp.direct 2021-10-23 07:26:23 -07:00
父節點 94a2a3160d
當前提交 2eaeb6c3fd
共有 6 個檔案被更改,包括 117 行新增21 行删除

查看文件

@ -5,6 +5,7 @@ import "errors"
// SwampStatus represents the current state of our Swamp.
type SwampStatus uint32
const (
// Running means the proxy pool is currently taking in proxys and validating them, and is available to dispense proxies.
Running SwampStatus = iota
@ -43,22 +44,31 @@ func (s *Swamp) Pause() error {
if s.IsRunning() {
return errors.New("already paused")
}
s.mu.RLock()
for n := s.runningdaemons; n > 0; n-- {
s.quit <- true
s.dbgPrint("pausing...")
var svcbuf = make(chan bool, s.svcStatus())
for s.svcStatus() != 0 {
svcbuf <- true
select {
case q := <- svcbuf:
s.quit <- q
default:
}
}
s.mu.RUnlock()
s.Status = Paused
return nil
}
// Resume will resume pause proxy pool operations, attempting to resume a running Swamp is a non-op.
// Resume will resume pause proxy pool operations, attempting to resume a running Swamp is returns an error.
func (s *Swamp) Resume() error {
if !s.IsRunning() && s.Status != New {
s.conductor = make(chan bool, 2)
if s.IsRunning() || s.Status == New {
return errors.New("not paused")
}
go s.mapBuilder()
go s.jobSpawner()
<-s.conductor
return nil
}

查看文件

@ -3,15 +3,28 @@ package Prox5
import (
"errors"
"strconv"
"strings"
"sync"
)
func (s *Swamp) svcUp() {
s.mu.Lock()
s.runningdaemons++
s.conductor <- true
s.mu.Unlock()
}
func (s *Swamp) svcDown() {
s.mu.Lock()
s.runningdaemons--
s.mu.Unlock()
}
func (s *Swamp) svcStatus() int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.runningdaemons
}
type swampMap struct {
@ -23,6 +36,17 @@ type swampMap struct {
func (sm swampMap) add(sock string) (*Proxy, bool) {
sm.mu.Lock()
defer sm.mu.Unlock()
var auth = &proxyAuth{}
if strings.Contains(sock, "@") {
split := strings.Split(sock, "@")
sock = split[1]
authsplit := strings.Split(split[0], ":")
auth.username = authsplit[0]
auth.password = authsplit[1]
}
if sm.exists(sock) {
return nil, false
}
@ -63,16 +87,19 @@ func (sm swampMap) clear() {
func (s *Swamp) mapBuilder() {
var filtered string
var ok bool
s.svcUp()
s.dbgPrint("map builder started")
defer func() {
s.svcDown()
s.dbgPrint("map builder paused")
}()
s.dbgPrint("map builder started")
for {
select {
case in := <-inChan:
if filtered, ok = s.stage1(in); !ok {
if filtered, ok = filter(in); !ok {
continue
}
if p, ok := s.swampmap.add(filtered); !ok {

16
defs.go
查看文件

@ -46,6 +46,7 @@ type Swamp struct {
pool *ants.Pool
swampopt *swampOptions
runningdaemons int
conductor chan bool
mu *sync.RWMutex
}
@ -153,6 +154,11 @@ const (
stateLocked
)
type proxyAuth struct {
username string
password string
}
// Proxy represents an individual proxy
type Proxy struct {
// Endpoint is the address:port of the proxy that we connect to
@ -168,6 +174,8 @@ type Proxy struct {
// timesBad is the amount of times the proxy has been marked as bad.
timesBad atomic.Value
auth *proxyAuth
parent *Swamp
lock uint32
}
@ -182,10 +190,10 @@ func (sock Proxy) UniqueKey() string {
// After calling this you can use the various "setters" to change the options before calling Swamp.Start().
func NewDefaultSwamp() *Swamp {
s := &Swamp{
ValidSocks5: make(chan *Proxy, 10000000),
ValidSocks4: make(chan *Proxy, 10000000),
ValidSocks4a: make(chan *Proxy, 10000000),
Pending: make(chan *Proxy, 10000000),
ValidSocks5: make(chan *Proxy, 1000000),
ValidSocks4: make(chan *Proxy, 1000000),
ValidSocks4a: make(chan *Proxy, 1000000),
Pending: make(chan *Proxy, 1000000),
Stats: &Statistics{
Valid4: 0,

查看文件

@ -18,18 +18,53 @@ func init() {
inChan = make(chan string, 1000000)
}
func (s *Swamp) stage1(in string) (string, bool) {
func filter(in string) (filtered string, ok bool) {
if !strings.Contains(in, ":") {
return in, false
}
split := strings.Split(in, ":")
if _, err := ipa.ParseIP(split[0]); err != nil {
switch len(split) {
case 1:
return in, false
case 2:
combo, err := ipa.ParseIPPort(in)
if err != nil {
return in, false
}
return combo.String(), true
case 3:
combo, err := ipa.ParseIPPort(split[0] + ":" + split[1])
if err != nil {
return in, false
}
return fmt.Sprintf("%s:%s@%s", split[2], split[3], combo.String()), true
default:
if !strings.Contains(split[0], "[") || !strings.Contains(split[0], "]") {
return in, false
}
}
split = strings.Split(in, "]:")
if len(split) != 2 {
return in, false
}
if _, err := strconv.Atoi(split[1]); err != nil {
combo, err := ipa.ParseIPPort(split[0] + "]:" + split[1])
if err != nil {
return in, false
}
return fmt.Sprintf("%s:%s", split[0], split[1]), true
if !strings.Contains(split[1], ":") {
return combo.String(), true
}
split6 := strings.Split(split[1], ":")
if len(split6) != 2 {
return in, false
}
return fmt.Sprintf("%s:%s@%s", split6[0], split6[1], combo.String()), true
}
// LoadProxyTXT loads proxies from a given seed file and feeds them to the mapBuilder to be later queued automatically for validation.
@ -48,7 +83,7 @@ func (s *Swamp) LoadProxyTXT(seedFile string) int {
scan := bufio.NewScanner(f)
for scan.Scan() {
if filtered, ok = s.stage1(scan.Text()); !ok {
if filtered, ok = filter(scan.Text()); !ok {
continue
}
go s.LoadSingleProxy(filtered)
@ -57,7 +92,6 @@ func (s *Swamp) LoadProxyTXT(seedFile string) int {
if err := f.Close(); err != nil {
s.dbgPrint(err.Error())
return count
}
return count
@ -83,7 +117,7 @@ func (s *Swamp) LoadMultiLineString(socks string) int {
}
// ClearSOCKSList clears the map of proxies that we have on record.
// Other operations (proxies that are still in buffered channels) will continue unless paused.
// Other operations (proxies that are still in buffered channels) will continue.
func (s *Swamp) ClearSOCKSList() {
s.swampmap.clear()
}

查看文件

@ -32,3 +32,12 @@ func randSleep() {
quiccmaffs.Seed(time.Now().UnixNano())
time.Sleep(time.Duration(quiccmaffs.Intn(500)) * time.Millisecond)
}
func allNil(obj ...interface{}) bool {
for _, o := range obj {
if o != nil {
return false
}
}
return true
}

查看文件

@ -109,9 +109,16 @@ func (s *Swamp) checkHTTP(sock *Proxy) (string, error) {
return string(rbody), err
}
func (s *Swamp) singleProxyCheck(sock *Proxy) error {
func (s *Swamp) anothaOne() {
s.Stats.Checked++
if _, err := net.DialTimeout("tcp", sock.Endpoint, s.swampopt.validationTimeout.Load().(time.Duration)); err != nil {
}
func (s *Swamp) singleProxyCheck(sock *Proxy) error {
defer s.anothaOne()
if _, err := net.DialTimeout("tcp", sock.Endpoint,
s.swampopt.validationTimeout.Load().(time.Duration)); err != nil {
s.badProx.Check(sock)
return err
}
@ -128,6 +135,7 @@ func (s *Swamp) singleProxyCheck(sock *Proxy) error {
}
sock.ProxiedIP = resp
return nil
}