Use pool implementation from common package

This commit is contained in:
kayos@tcp.direct 2022-10-15 23:45:19 -07:00
parent aae6c2ece7
commit a6969af053
Signed by: kayos
GPG Key ID: 4B841471B4BEE979
10 changed files with 101 additions and 143 deletions

View File

@ -3,11 +3,8 @@ package prox5
import ( import (
"errors" "errors"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
"git.tcp.direct/kayos/prox5/internal/pools"
) )
type swampMap struct { type swampMap struct {
@ -65,7 +62,6 @@ func (sm swampMap) clear() {
} }
} }
func (p5 *Swamp) recycling() int { func (p5 *Swamp) recycling() int {
if !p5.GetRecyclingStatus() { if !p5.GetRecyclingStatus() {
return 0 return 0
@ -118,10 +114,10 @@ func (p5 *Swamp) jobSpawner() {
default: default:
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
count := p5.recycling() count := p5.recycling()
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("recycled ") buf.MustWriteString("recycled ")
buf.WriteString(strconv.Itoa(count)) buf.MustWriteString(strconv.Itoa(count))
buf.WriteString(" proxies from our map") buf.MustWriteString(" proxies from our map")
p5.dbgPrint(buf) p5.dbgPrint(buf)
} }
} }

View File

@ -2,11 +2,10 @@ package prox5
import ( import (
"fmt" "fmt"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"git.tcp.direct/kayos/prox5/internal/pools" "git.tcp.direct/kayos/common/pool"
) )
var ( var (
@ -30,8 +29,8 @@ type SocksLogger struct {
// Printf is used to handle socks server logging. // Printf is used to handle socks server logging.
func (s SocksLogger) Printf(format string, a ...interface{}) { func (s SocksLogger) Printf(format string, a ...interface{}) {
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString(fmt.Sprintf(format, a...)) buf.MustWriteString(fmt.Sprintf(format, a...))
s.parent.dbgPrint(buf) s.parent.dbgPrint(buf)
} }
@ -72,14 +71,14 @@ func (p5 *Swamp) DisableDebug() {
atomic.StoreUint32(debugStatus, debugDisabled) atomic.StoreUint32(debugStatus, debugDisabled)
} }
func simpleString(s string) *strings.Builder { func simpleString(s string) *pool.String {
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString(s) buf.MustWriteString(s)
return buf return buf
} }
func (p5 *Swamp) dbgPrint(builder *strings.Builder) { func (p5 *Swamp) dbgPrint(builder *pool.String) {
defer pools.DiscardBuffer(builder) defer strs.MustPut(builder)
if !p5.DebugEnabled() { if !p5.DebugEnabled() {
return return
} }
@ -91,20 +90,20 @@ func (p5 *Swamp) msgUnableToReach(socksString, target string, err error) {
if !p5.DebugEnabled() { if !p5.DebugEnabled() {
return return
} }
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("unable to reach ") buf.MustWriteString("unable to reach ")
if p5.swampopt.redact { if p5.swampopt.redact {
buf.WriteString("[redacted]") buf.MustWriteString("[redacted]")
} else { } else {
buf.WriteString(target) buf.MustWriteString(target)
} }
buf.WriteString(" with ") buf.MustWriteString(" with ")
buf.WriteString(socksString) buf.MustWriteString(socksString)
if !p5.swampopt.redact { if !p5.swampopt.redact {
buf.WriteString(": ") buf.MustWriteString(": ")
buf.WriteString(err.Error()) buf.MustWriteString(err.Error())
} }
buf.WriteString(", cycling...") buf.MustWriteString(", cycling...")
p5.dbgPrint(buf) p5.dbgPrint(buf)
} }
@ -112,9 +111,9 @@ func (p5 *Swamp) msgUsingProxy(socksString string) {
if !p5.DebugEnabled() { if !p5.DebugEnabled() {
return return
} }
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("MysteryDialer using socks: ") buf.MustWriteString("MysteryDialer using socks: ")
buf.WriteString(socksString) buf.MustWriteString(socksString)
p5.dbgPrint(buf) p5.dbgPrint(buf)
} }
@ -122,10 +121,10 @@ func (p5 *Swamp) msgFailedMiddleware(socksString string) {
if !p5.DebugEnabled() { if !p5.DebugEnabled() {
return return
} }
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("failed middleware check, ") buf.MustWriteString("failed middleware check, ")
buf.WriteString(socksString) buf.MustWriteString(socksString)
buf.WriteString(", cycling...") buf.MustWriteString(", cycling...")
p5.dbgPrint(buf) p5.dbgPrint(buf)
} }
@ -133,9 +132,9 @@ func (p5 *Swamp) msgTry(socksString string) {
if !p5.DebugEnabled() { if !p5.DebugEnabled() {
return return
} }
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("try dial with: ") buf.MustWriteString("try dial with: ")
buf.WriteString(socksString) buf.MustWriteString(socksString)
p5.dbgPrint(buf) p5.dbgPrint(buf)
} }
@ -143,11 +142,11 @@ func (p5 *Swamp) msgCantGetLock(socksString string, putback bool) {
if !p5.DebugEnabled() { if !p5.DebugEnabled() {
return return
} }
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("can't get lock for ") buf.MustWriteString("can't get lock for ")
buf.WriteString(socksString) buf.MustWriteString(socksString)
if putback { if putback {
buf.WriteString(", putting back in queue") buf.MustWriteString(", putting back in queue")
} }
p5.dbgPrint(buf) p5.dbgPrint(buf)
} }
@ -156,9 +155,9 @@ func (p5 *Swamp) msgGotLock(socksString string) {
if !p5.DebugEnabled() { if !p5.DebugEnabled() {
return return
} }
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("got lock for ") buf.MustWriteString("got lock for ")
buf.WriteString(socksString) buf.MustWriteString(socksString)
p5.dbgPrint(buf) p5.dbgPrint(buf)
} }
@ -166,18 +165,18 @@ func (p5 *Swamp) msgChecked(sock *Proxy, success bool) {
if !p5.DebugEnabled() { if !p5.DebugEnabled() {
return return
} }
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
if !success { if !success {
buf.WriteString("failed to verify: ") buf.MustWriteString("failed to verify: ")
buf.WriteString(sock.Endpoint) buf.MustWriteString(sock.Endpoint)
p5.dbgPrint(buf) p5.dbgPrint(buf)
return return
} }
buf.WriteString("verified ") buf.MustWriteString("verified ")
buf.WriteString(sock.Endpoint) buf.MustWriteString(sock.Endpoint)
buf.WriteString(" as ") buf.MustWriteString(" as ")
buf.WriteString(sock.protocol.Get().String()) buf.MustWriteString(sock.protocol.Get().String())
buf.WriteString(" proxy") buf.MustWriteString(" proxy")
p5.dbgPrint(buf) p5.dbgPrint(buf)
} }
@ -185,9 +184,9 @@ func (p5 *Swamp) msgBadProxRate(sock *Proxy) {
if !p5.DebugEnabled() { if !p5.DebugEnabled() {
return return
} }
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("badProx ratelimited: ") buf.MustWriteString("badProx ratelimited: ")
buf.WriteString(sock.Endpoint) buf.MustWriteString(sock.Endpoint)
p5.dbgPrint(buf) p5.dbgPrint(buf)
} }

View File

@ -2,7 +2,6 @@ package prox5
import ( import (
"context" "context"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -10,7 +9,6 @@ import (
"github.com/panjf2000/ants/v2" "github.com/panjf2000/ants/v2"
rl "github.com/yunginnanet/Rate5" rl "github.com/yunginnanet/Rate5"
"git.tcp.direct/kayos/prox5/internal/pools"
"git.tcp.direct/kayos/prox5/logger" "git.tcp.direct/kayos/prox5/logger"
) )
@ -185,9 +183,9 @@ func NewProxyEngine() *Swamp {
})) }))
if err != nil { if err != nil {
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("CRITICAL: ") buf.MustWriteString("CRITICAL: ")
buf.WriteString(err.Error()) buf.MustWriteString(err.Error())
pe.dbgPrint(buf) pe.dbgPrint(buf)
panic(err) panic(err)
} }

View File

@ -1,11 +1,8 @@
package prox5 package prox5
import ( import (
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
"git.tcp.direct/kayos/prox5/internal/pools"
) )
// Socks5Str gets a SOCKS5 proxy that we have fully verified (dialed and then retrieved our IP address from a what-is-my-ip endpoint. // Socks5Str gets a SOCKS5 proxy that we have fully verified (dialed and then retrieved our IP address from a what-is-my-ip endpoint.
@ -105,9 +102,9 @@ func (p5 *Swamp) stillGood(sock *Proxy) bool {
defer atomic.StoreUint32(&sock.lock, stateUnlocked) defer atomic.StoreUint32(&sock.lock, stateUnlocked)
if atomic.LoadInt64(&sock.timesBad) > int64(p5.GetRemoveAfter()) && p5.GetRemoveAfter() != -1 { if atomic.LoadInt64(&sock.timesBad) > int64(p5.GetRemoveAfter()) && p5.GetRemoveAfter() != -1 {
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("deleting from map (too many failures): ") buf.MustWriteString("deleting from map (too many failures): ")
buf.WriteString(sock.Endpoint) buf.MustWriteString(sock.Endpoint)
p5.dbgPrint(buf) p5.dbgPrint(buf)
if err := p5.swampmap.delete(sock.Endpoint); err != nil { if err := p5.swampmap.delete(sock.Endpoint); err != nil {
p5.dbgPrint(simpleString(err.Error())) p5.dbgPrint(simpleString(err.Error()))
@ -115,17 +112,17 @@ func (p5 *Swamp) stillGood(sock *Proxy) bool {
} }
if p5.badProx.Peek(sock) { if p5.badProx.Peek(sock) {
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("badProx dial ratelimited: ") buf.MustWriteString("badProx dial ratelimited: ")
buf.WriteString(sock.Endpoint) buf.MustWriteString(sock.Endpoint)
p5.dbgPrint(buf) p5.dbgPrint(buf)
return false return false
} }
if time.Since(sock.lastValidated) > p5.swampopt.stale { if time.Since(sock.lastValidated) > p5.swampopt.stale {
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("proxy stale: ") buf.MustWriteString("proxy stale: ")
buf.WriteString(sock.Endpoint) buf.MustWriteString(sock.Endpoint)
p5.dbgPrint(buf) p5.dbgPrint(buf)
go p5.stats.stale() go p5.stats.stale()
return false return false

View File

@ -1,13 +0,0 @@
package pools
import (
"strings"
"sync"
)
var CopABuffer = &sync.Pool{New: func() interface{} { return &strings.Builder{} }}
func DiscardBuffer(buf *strings.Builder) {
buf.Reset()
CopABuffer.Put(buf)
}

View File

@ -4,13 +4,10 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
"git.tcp.direct/kayos/socks" "git.tcp.direct/kayos/socks"
"git.tcp.direct/kayos/prox5/internal/pools"
) )
// DialContext is a simple stub adapter to implement a net.Dialer. // DialContext is a simple stub adapter to implement a net.Dialer.
@ -34,13 +31,13 @@ func (p5 *Swamp) DialTimeout(network, addr string, timeout time.Duration) (net.C
} }
func (p5 *Swamp) addTimeout(socksString string) string { func (p5 *Swamp) addTimeout(socksString string) string {
tout := pools.CopABuffer.Get().(*strings.Builder) tout := strs.Get()
tout.WriteString(socksString) tout.MustWriteString(socksString)
tout.WriteString("?timeout=") tout.MustWriteString("?timeout=")
tout.WriteString(p5.GetServerTimeoutStr()) tout.MustWriteString(p5.GetServerTimeoutStr())
tout.WriteRune('s') _, _ = tout.WriteRune('s')
socksString = tout.String() socksString = tout.String()
pools.DiscardBuffer(tout) strs.MustPut(tout)
return socksString return socksString
} }

View File

@ -6,8 +6,6 @@ import (
"github.com/miekg/dns" "github.com/miekg/dns"
ipa "inet.af/netaddr" ipa "inet.af/netaddr"
"git.tcp.direct/kayos/prox5/internal/pools"
) )
func filterv6(in string) (filtered string, ok bool) { func filterv6(in string) (filtered string, ok bool) {
@ -44,22 +42,21 @@ func isNumber(s string) bool {
} }
func buildProxyString(username, password, address, port string, v6 bool) (result string) { func buildProxyString(username, password, address, port string, v6 bool) (result string) {
builder := pools.CopABuffer.Get().(*strings.Builder) builder := strs.Get()
defer strs.MustPut(builder)
if username != "" && password != "" { if username != "" && password != "" {
builder.WriteString(username) builder.MustWriteString(username)
builder.WriteString(":") builder.MustWriteString(":")
builder.WriteString(password) builder.MustWriteString(password)
builder.WriteString("@") builder.MustWriteString("@")
} }
builder.WriteString(address) builder.MustWriteString(address)
if v6 { if v6 {
builder.WriteString("]") builder.MustWriteString("]")
} }
builder.WriteString(":") builder.MustWriteString(":")
builder.WriteString(port) builder.MustWriteString(port)
result = builder.String() return builder.String()
pools.DiscardBuffer(builder)
return
} }
func filter(in string) (filtered string, ok bool) { //nolint:cyclop func filter(in string) (filtered string, ok bool) { //nolint:cyclop

View File

@ -1,12 +1,9 @@
package prox5 package prox5
import ( import (
"strings"
"time" "time"
rl "github.com/yunginnanet/Rate5" rl "github.com/yunginnanet/Rate5"
"git.tcp.direct/kayos/prox5/internal/pools"
) )
// https://pkg.go.dev/github.com/yunginnanet/Rate5#Policy // https://pkg.go.dev/github.com/yunginnanet/Rate5#Policy
@ -57,23 +54,15 @@ func (sock *Proxy) GetProto() ProxyProtocol {
// GetProto safely retrieves the protocol value of the Proxy. // GetProto safely retrieves the protocol value of the Proxy.
func (sock *Proxy) String() string { func (sock *Proxy) String() string {
tout := "" buf := strs.Get()
defer strs.MustPut(buf)
buf.MustWriteString(sock.GetProto().String())
buf.MustWriteString("://")
buf.MustWriteString(sock.Endpoint)
if sock.parent.GetServerTimeoutStr() != "-1" { if sock.parent.GetServerTimeoutStr() != "-1" {
tbuf := pools.CopABuffer.Get().(*strings.Builder) buf.MustWriteString("?timeout=")
tbuf.WriteString("?timeout=") buf.MustWriteString(sock.parent.GetServerTimeoutStr())
tbuf.WriteString(sock.parent.GetServerTimeoutStr()) buf.MustWriteString("s")
tbuf.WriteString("s")
tout = tbuf.String()
pools.DiscardBuffer(tbuf)
} }
buf := pools.CopABuffer.Get().(*strings.Builder) return buf.String()
buf.WriteString(sock.GetProto().String())
buf.WriteString("://")
buf.WriteString(sock.Endpoint)
if tout != "" {
buf.WriteString(tout)
}
out := buf.String()
pools.DiscardBuffer(buf)
return out
} }

View File

@ -1,13 +1,13 @@
package prox5 package prox5
import ( import (
"strings"
"git.tcp.direct/kayos/go-socks5" "git.tcp.direct/kayos/go-socks5"
"git.tcp.direct/kayos/prox5/internal/pools" "git.tcp.direct/kayos/common/pool"
) )
var strs = pool.NewStringFactory()
type socksCreds struct { type socksCreds struct {
username string username string
password string password string
@ -33,9 +33,9 @@ func (p5 *Swamp) StartSOCKS5Server(listen, username, password string) error {
// Resolver: pe.MysteryResolver, // Resolver: pe.MysteryResolver,
} }
buf := pools.CopABuffer.Get().(*strings.Builder) buf := strs.Get()
buf.WriteString("listening for SOCKS5 connections on ") buf.MustWriteString("listening for SOCKS5 connections on ")
buf.WriteString(listen) buf.MustWriteString(listen)
p5.dbgPrint(buf) p5.dbgPrint(buf)
server, err := socks5.New(conf) server, err := socks5.New(conf)

View File

@ -14,8 +14,6 @@ import (
"git.tcp.direct/kayos/socks" "git.tcp.direct/kayos/socks"
"golang.org/x/net/proxy" "golang.org/x/net/proxy"
"git.tcp.direct/kayos/prox5/internal/pools"
) )
func (p5 *Swamp) prepHTTP() (*http.Client, *http.Transport, *http.Request, error) { func (p5 *Swamp) prepHTTP() (*http.Client, *http.Transport, *http.Request, error) {
@ -52,15 +50,15 @@ func (sock *Proxy) good() {
} }
func (p5 *Swamp) bakeHTTP(hmd *HandMeDown) (client *http.Client, req *http.Request, err error) { func (p5 *Swamp) bakeHTTP(hmd *HandMeDown) (client *http.Client, req *http.Request, err error) {
builder := pools.CopABuffer.Get().(*strings.Builder) builder := strs.Get()
builder.WriteString(hmd.protoCheck.String()) builder.MustWriteString(hmd.protoCheck.String())
builder.WriteString("://") builder.MustWriteString("://")
builder.WriteString(hmd.sock.Endpoint) builder.MustWriteString(hmd.sock.Endpoint)
builder.WriteString("/?timeout=") builder.MustWriteString("/?timeout=")
builder.WriteString(p5.GetValidationTimeoutStr()) builder.MustWriteString(p5.GetValidationTimeoutStr())
builder.WriteString("s") builder.MustWriteString("s")
dialSocks := socks.DialWithConn(builder.String(), hmd.conn) dialSocks := socks.DialWithConn(builder.String(), hmd.conn)
pools.DiscardBuffer(builder) strs.MustPut(builder)
var ( var (
purl *url.URL purl *url.URL