prox5/prox5_test.go

267 lines
5.8 KiB
Go

package prox5
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"git.tcp.direct/kayos/common/entropy"
"git.tcp.direct/kayos/go-socks5"
)
var failures = &atomic.Int64{}
func init() {
_ = os.Setenv("PROX5_SCALER_DEBUG", "1")
failures.Store(0)
}
type randomFail struct {
t *testing.T
failedCount int64
maxFail int64
failOneOutOf int
}
func (rf *randomFail) fail() bool {
if rf.failOneOutOf == 0 {
return false
}
doFail := entropy.GetOptimizedRand().Intn(rf.failOneOutOf) == 1
if !doFail {
return false
}
atomic.AddInt64(&rf.failedCount, 1)
rf.t.Logf("random SOCKS failure triggered, total fail count: %d", rf.failedCount)
if rf.maxFail > 0 && atomic.LoadInt64(&rf.failedCount) > rf.maxFail {
rf.t.Errorf("[FAIL] random SOCKS failure triggered too many times, total fail count: %d", rf.failedCount)
}
failures.Add(1)
return true
}
type dummyHTTPServer struct {
t *testing.T
net.Listener
}
func timeNowJSON() []byte {
js, _ := time.Now().MarshalJSON()
return js
}
func newDummyHTTPSServer(t *testing.T, port int) {
t.Helper()
dtcp := &dummyHTTPServer{t: t}
var err error
if dtcp.Listener, err = net.Listen("tcp", ":"+strconv.Itoa(port)); err != nil && !errors.Is(err, net.ErrClosed) {
t.Fatal(err)
}
go func() {
if err = http.Serve(dtcp, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(time.Duration(entropy.RNG(300)) * time.Millisecond)
if _, err = w.Write(timeNowJSON()); err != nil {
t.Error("[FAIL] http server failed to write JSON: " + err.Error())
}
})); err != nil && !errors.Is(err, net.ErrClosed) {
t.Error("[FAIL] http.Serve error: " + err.Error())
}
}()
t.Cleanup(func() {
_ = dtcp.Close()
})
t.Logf("dummy HTTPS server listening on port %d", port)
}
var ErrRandomFail = errors.New("random failure")
func dummySOCKSServer(t *testing.T, port int, rf ...*randomFail) {
t.Helper()
var failure = &randomFail{t: t, failedCount: int64(0), failOneOutOf: 0}
if len(rf) > 0 {
failure = rf[0]
}
dialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
if failure.fail() {
return nil, ErrRandomFail
}
time.Sleep(time.Duration(entropy.GetOptimizedRand().Intn(300)) * time.Millisecond)
return net.Dial(network, addr)
}
server := socks5.NewServer(socks5.WithDial(dialer))
go func() {
err := server.ListenAndServe("tcp", "127.0.0.1:"+strconv.Itoa(port))
if err != nil && !errors.Is(err, net.ErrClosed) {
t.Error("[FAIL] socks server failure: " + err.Error())
}
}()
}
type p5TestLogger struct {
t *testing.T
}
func (tl p5TestLogger) Errorf(format string, args ...interface{}) {
tl.t.Logf("[ERROR] "+format, args...)
}
func (tl p5TestLogger) Printf(format string, args ...interface{}) {
val := fmt.Sprintf(format, args...)
if strings.Contains(val, "failed to verify") {
failures.Add(1)
}
tl.t.Logf("[PRINT] " + val)
}
func (tl p5TestLogger) Print(args ...interface{}) {
val := fmt.Sprintf("%+v", args...)
if strings.Contains(val, "failed to verify") {
failures.Add(1)
}
tl.t.Log("[PRINT] " + val)
}
func TestProx5(t *testing.T) {
numTest := 100
if envCount := os.Getenv("PROX5_TEST_COUNT"); envCount != "" {
n, e := strconv.Atoi(envCount)
if e != nil {
t.Skip(e.Error())
}
numTest = n
}
for i := 0; i < numTest; i++ {
dummySOCKSServer(t, 5555+i, &randomFail{
t: t,
failedCount: int64(0),
failOneOutOf: entropy.RNG(200),
maxFail: 50,
})
time.Sleep(time.Millisecond * 5)
}
newDummyHTTPSServer(t, 8055)
time.Sleep(time.Millisecond * 350)
p5 := NewProxyEngine()
p5.SetAndEnableDebugLogger(p5TestLogger{t: t})
p5.SetMaxWorkers(10)
p5.EnableAutoScaler()
p5.SetAutoScalerThreshold(10)
// p5.SetValidationTimeout(200 * time.Millisecond)
p5.SetAutoScalerMaxScale(100)
// p5.DisableRecycling()
p5.SetRemoveAfter(2)
var index = 5555
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
var once = &sync.Once{}
check5 := func() {
if err := p5.Pause(); err != nil {
t.Errorf("[FAIL] failed to pause: %s", err.Error())
}
time.Sleep(time.Second * 1)
got := p5.GetTotalValidated()
want := 55 - failures.Load()
if got != want {
t.Logf("[WARN] total validated proxies does not match expected, got: %d, expected: %d",
got, want)
}
if err := p5.Resume(); err != nil {
t.Errorf("[FAIL] failed to resume: %s", err.Error())
}
}
load := func() {
if index > 5555+numTest {
return
}
entropy.RandSleepMS(150)
p5.LoadSingleProxy("127.0.0.1:" + strconv.Itoa(index))
if index == 5555+55 {
once.Do(check5)
}
index++
}
var successCount int64 = 0
makeReq := func() {
select {
case <-ctx.Done():
return
default:
}
resp, err := p5.GetHTTPClient().Get("http://127.0.0.1:8055")
if err != nil && !errors.Is(err, ErrNoProxies) && !errors.Is(err, net.ErrClosed) {
t.Error("[FAIL] " + err.Error())
}
if err != nil && errors.Is(err, ErrNoProxies) {
return
}
if resp == nil {
return
}
b, e := io.ReadAll(resp.Body)
if e != nil && !errors.Is(e, net.ErrClosed) {
t.Log("[WARN] " + e.Error())
}
t.Logf("got proxied response: %s", string(b))
atomic.AddInt64(&successCount, 1)
}
ticker := time.NewTicker(time.Millisecond * 100)
if err := p5.Start(); err != nil {
t.Fatal(err)
}
wait := 0
testLoop:
for {
select {
case <-ctx.Done():
successCountFinal := atomic.LoadInt64(&successCount)
if successCountFinal < 10 {
t.Fatal("no successful requests")
}
t.Logf("total successful requests: %d", successCountFinal)
p5.CloseAllConns()
break testLoop
case <-ticker.C:
// pre-warm
wait++
if wait >= 50 {
go makeReq()
}
default:
load()
}
}
cancel()
if err := p5.Close(); err != nil {
t.Fatal(err)
}
// let the proxy engine close gracefully
time.Sleep(time.Second * 5)
}