From 0dcbaf3b86e6ef1808b637cbbec4258ae3a99e83 Mon Sep 17 00:00:00 2001 From: "kayos@tcp.direct" Date: Tue, 4 Oct 2022 00:56:59 -0700 Subject: [PATCH] Fix: forever read + Feat: listen on unix socket --- builder.go | 50 ++++++++++++ cmd/termbintest.go | 38 ---------- go.mod | 5 +- go.sum | 10 ++- logger.go | 20 +++++ main.go | 184 --------------------------------------------- putxt.go | 146 +++++++++++++++++++++++++++++++++++ putxt_test.go | 75 ++++++++++++++++++ 8 files changed, 300 insertions(+), 228 deletions(-) create mode 100644 builder.go delete mode 100644 cmd/termbintest.go create mode 100644 logger.go delete mode 100644 main.go create mode 100644 putxt.go create mode 100644 putxt_test.go diff --git a/builder.go b/builder.go new file mode 100644 index 0000000..bbb3747 --- /dev/null +++ b/builder.go @@ -0,0 +1,50 @@ +package putxt + +import ( + "time" + + "git.tcp.direct/kayos/common/pool" + "github.com/yunginnanet/Rate5" +) + +type TermDumpster struct { + gzip bool + maxSize int64 + timeout time.Duration + handler Handler + log Logger + Pool pool.BufferFactory + *rate5.Limiter +} + +func NewTermDumpster(handler Handler) *TermDumpster { + td := &TermDumpster{ + maxSize: 3 << 20, + timeout: 5 * time.Second, + Limiter: rate5.NewStrictLimiter(60, 5), + handler: handler, + log: dummyLogger{}, + Pool: pool.NewBufferFactory(), + } + return td +} + +func (td *TermDumpster) WithGzip() *TermDumpster { + td.gzip = true + return td +} + +func (td *TermDumpster) WithMaxSize(size int64) *TermDumpster { + td.maxSize = size + return td +} + +func (td *TermDumpster) WithTimeout(timeout time.Duration) *TermDumpster { + td.timeout = timeout + return td +} + +func (td *TermDumpster) WithLogger(logger Logger) *TermDumpster { + td.log = logger + return td +} diff --git a/cmd/termbintest.go b/cmd/termbintest.go deleted file mode 100644 index aa24485..0000000 --- a/cmd/termbintest.go +++ /dev/null @@ -1,38 +0,0 @@ -package main - -// meant to act as a simple example - -import ( - "errors" - "strings" - "time" - - "git.tcp.direct/kayos/common/squish" - - termbin "git.tcp.direct/kayos/putxt" -) - -type handler struct{} - -func (h *handler) Ingest(data []byte) ([]byte, error) { - var err error - data, err = squish.Gunzip(data) - if err != nil { - return nil, err - } - if strings.ReplaceAll(string(data), "\n", "") == "ping" { - println("got ping, sending pong...") - return []byte("pong"), nil - } - println(string(data)) - return []byte("invalid request"), errors.New("invalid data") -} - -func main() { - td := termbin.NewTermDumpster(&handler{}).WithGzip().WithMaxSize(3 << 20).WithTimeout(5 * time.Second) - err := td.Listen("127.0.0.1", "8888") - if err != nil { - println(err.Error()) - return - } -} diff --git a/go.mod b/go.mod index 54ffb61..c3daaf8 100644 --- a/go.mod +++ b/go.mod @@ -3,14 +3,15 @@ module git.tcp.direct/kayos/putxt go 1.18 require ( - git.tcp.direct/kayos/common v0.7.0 + git.tcp.direct/kayos/common v0.7.5 github.com/yunginnanet/Rate5 v1.1.0 golang.org/x/tools v0.1.12 - inet.af/netaddr v0.0.0-20220617031823-097006376321 + inet.af/netaddr v0.0.0-20220811202034-502d2d690317 ) require ( github.com/patrickmn/go-cache v2.1.0+incompatible // indirect go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect + nullprogram.com/x/rng v1.1.0 // indirect ) diff --git a/go.sum b/go.sum index fee9912..4d74922 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -git.tcp.direct/kayos/common v0.7.0 h1:KZDwoCzUiwQaYSWESr080N8wUVyLD27QYgzXgc7LiAQ= -git.tcp.direct/kayos/common v0.7.0/go.mod h1:7tMZBVNPLFSZk+JXTA6pgXWpf/XHqYRfT7Q3OziI++Y= +git.tcp.direct/kayos/common v0.7.5 h1:a95oIv3pzRwzYaINqFASnXqXOWVWupIVWHcOtTVUOHU= +git.tcp.direct/kayos/common v0.7.5/go.mod h1:jVbdX9prBrx9e3aTsNpu643brGVgpLvysl40/F5U2cE= github.com/dvyukov/go-fuzz v0.0.0-20210103155950-6a8e9d1f2415/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw= github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= @@ -34,5 +34,7 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -inet.af/netaddr v0.0.0-20220617031823-097006376321 h1:B4dC8ySKTQXasnjDTMsoCMf1sQG4WsMej0WXaHxunmU= -inet.af/netaddr v0.0.0-20220617031823-097006376321/go.mod h1:OIezDfdzOgFhuw4HuWapWq2e9l0H9tK4F1j+ETRtF3k= +inet.af/netaddr v0.0.0-20220811202034-502d2d690317 h1:U2fwK6P2EqmopP/hFLTOAjWTki0qgd4GMJn5X8wOleU= +inet.af/netaddr v0.0.0-20220811202034-502d2d690317/go.mod h1:OIezDfdzOgFhuw4HuWapWq2e9l0H9tK4F1j+ETRtF3k= +nullprogram.com/x/rng v1.1.0 h1:SMU7DHaQSWtKJNTpNFIFt8Wd/KSmOuSDPXrMFp/UMro= +nullprogram.com/x/rng v1.1.0/go.mod h1:glGw6V87vyfawxCzqOABL3WfL95G65az9Z2JZCylCkg= diff --git a/logger.go b/logger.go new file mode 100644 index 0000000..d1ba4c5 --- /dev/null +++ b/logger.go @@ -0,0 +1,20 @@ +package putxt + +import "fmt" + +const ( + MessageRatelimited = "RATELIMIT_REACHED" + MessageSizeLimited = "MAX_SIZE_EXCEEDED" + MessageBinaryData = "BINARY_DATA_REJECTED" + MessageInternalError = "INTERNAL_ERROR" +) + +type Logger interface { + Printf(format string, v ...interface{}) +} + +type dummyLogger struct{} + +func (dummyLogger) Printf(format string, v ...interface{}) { + _, _ = fmt.Printf(format, v...) +} diff --git a/main.go b/main.go deleted file mode 100644 index ce54e06..0000000 --- a/main.go +++ /dev/null @@ -1,184 +0,0 @@ -package termbin - -import ( - "bytes" - "fmt" - "net" - "sync" - "time" - - "git.tcp.direct/kayos/common/squish" - "github.com/yunginnanet/Rate5" - "golang.org/x/tools/godoc/util" - "inet.af/netaddr" -) - -const ( - MessageRatelimited = "RATELIMIT_REACHED" - MessageSizeLimited = "MAX_SIZE_EXCEEDED" - MessageBinaryData = "BINARY_DATA_REJECTED" -) - -type TermDumpster struct { - gzip bool - maxSize int64 - timeout time.Duration - handler Handler - log Logger - *rate5.Limiter - *sync.Pool -} - -type Logger interface { - Printf(format string, v ...interface{}) -} - -type Handler interface { - Ingest(data []byte) ([]byte, error) -} - -type dummyLogger struct{} - -func (dummyLogger) Printf(format string, v ...interface{}) { - _, _ = fmt.Printf(format, v...) -} - -func NewTermDumpster(handler Handler) *TermDumpster { - td := &TermDumpster{ - maxSize: 3 << 20, - timeout: 5 * time.Second, - Limiter: rate5.NewStrictLimiter(60, 5), - handler: handler, - log: dummyLogger{}, - } - td.Pool = &sync.Pool{ - New: func() any { return new(bytes.Buffer) }, - } - return td -} - -func (td *TermDumpster) WithGzip() *TermDumpster { - td.gzip = true - return td -} - -func (td *TermDumpster) WithMaxSize(size int64) *TermDumpster { - td.maxSize = size - return td -} - -func (td *TermDumpster) WithTimeout(timeout time.Duration) *TermDumpster { - td.timeout = timeout - return td -} - -func (td *TermDumpster) WithLogger(logger Logger) *TermDumpster { - td.log = logger - return td -} - -type termbinClient struct { - parent *TermDumpster - addr string - net.Conn -} - -func (c *termbinClient) UniqueKey() string { - return c.addr -} - -func (td *TermDumpster) newClient(c net.Conn) *termbinClient { - cipp, _ := netaddr.ParseIPPort(c.RemoteAddr().String()) - return &termbinClient{parent: td, addr: cipp.IP().String(), Conn: c} -} - -func (c *termbinClient) write(data []byte) { - if _, err := c.Write(data); err != nil { - c.parent.log.Printf("termbinClient: %s error: %w", c.RemoteAddr().String(), err) - } -} - -func (c *termbinClient) writeString(data string) { - c.write([]byte(data)) -} - -func (td *TermDumpster) accept(c net.Conn) { - var ( - final []byte - length int64 - ) - client := td.newClient(c) - if td.Check(client) { - client.writeString(MessageRatelimited) - client.Close() - td.log.Printf("termbinClient: %s error: %s", client.RemoteAddr().String(), MessageRatelimited) - return - } - buf := td.Pool.Get().(*bytes.Buffer) - defer func() { - _ = client.Close() - buf.Reset() - td.Put(buf) - }() -readLoop: - for { - if err := client.SetReadDeadline(time.Now().Add(td.timeout)); err != nil { - td.log.Printf("failed to set read deadline: %s error: %w", client.RemoteAddr().String(), err) - return - } - n, err := buf.ReadFrom(client) - if err != nil { - switch err.Error() { - case "EOF": - break readLoop - case "read tcp " + client.LocalAddr().String() + "->" + client.RemoteAddr().String() + ": i/o timeout": - break readLoop - default: - td.log.Printf("termbinClient: %s error: %w", client.RemoteAddr().String(), err) - return - } - } - length += n - if length > td.maxSize { - client.writeString(MessageSizeLimited) - return - } - } - if !util.IsText(buf.Bytes()) { - client.writeString(MessageBinaryData) - return - } - if td.gzip { - if final = squish.Gzip(buf.Bytes()); final == nil { - final = buf.Bytes() - } - } - resp, err := td.handler.Ingest(final) - if err != nil { - if resp == nil { - client.writeString("INTERNAL_ERROR") - } - td.log.Printf("termbinClient: %s error: %w", client.RemoteAddr().String(), err) - } - _, err = client.Write(resp) - if err != nil { - td.log.Printf("termbinClient: %s failed to deliver result: %w", client.RemoteAddr().String(), err) - } -} - -// Listen starts the TCP server -func (td *TermDumpster) Listen(addr string, port string) error { - l, err := net.Listen("tcp", addr+":"+port) - if err != nil { - return err - } - defer l.Close() - for { - c, err := l.Accept() - if err != nil { - td.log.Printf("Error accepting connection: %s", err.Error()) - continue - } - go td.accept(c) - } -} diff --git a/putxt.go b/putxt.go new file mode 100644 index 0000000..edeceb5 --- /dev/null +++ b/putxt.go @@ -0,0 +1,146 @@ +package putxt + +import ( + "errors" + "io" + "net" + "os" + "syscall" + "time" + + "git.tcp.direct/kayos/common/squish" + "golang.org/x/tools/godoc/util" + "inet.af/netaddr" +) + +type Handler interface { + Ingest(data []byte) ([]byte, error) +} + +type termbinClient struct { + parent *TermDumpster + addr string + net.Conn +} + +func (c *termbinClient) UniqueKey() string { + return c.addr +} + +func (td *TermDumpster) newClient(c net.Conn) *termbinClient { + cipp, _ := netaddr.ParseIPPort(c.RemoteAddr().String()) + return &termbinClient{parent: td, addr: cipp.IP().String(), Conn: c} +} + +func (c *termbinClient) write(data []byte) { + if _, err := c.Write(data); err != nil { + c.parent.log.Printf("termbinClient: %s error: %s", c.RemoteAddr().String(), err.Error()) + } +} + +func (c *termbinClient) writeString(data string) { + c.write([]byte(data)) +} + +func (td *TermDumpster) accept(c net.Conn) { + var final []byte + client := td.newClient(c) + if td.Check(client) { + client.writeString(MessageRatelimited) + _ = client.Close() + td.log.Printf("termbinClient: %s error: %s", client.RemoteAddr().String(), MessageRatelimited) + return + } + buf := td.Pool.Get() + defer func() { + _ = client.Close() + td.Pool.MustPut(buf) + }() + if err := client.SetReadDeadline(time.Now().Add(td.timeout)); err != nil { + td.log.Printf("failed to set read deadline: %s error: %s", client.RemoteAddr().String(), err.Error()) + return + } +readLoop: + for { + _, err := buf.ReadFrom(client) + if err != nil { + switch { + case errors.Is(err, io.EOF): + break readLoop + case os.IsTimeout(err): + break readLoop + default: + td.log.Printf("termbinClient: %s error: %s", client.RemoteAddr().String(), err.Error()) + return + } + } + if int64(buf.Len()) > td.maxSize { + client.writeString(MessageSizeLimited) + return + } + } + if !util.IsText(buf.Bytes()) { + client.writeString(MessageBinaryData) + return + } + if td.gzip { + if final = squish.Gzip(buf.Bytes()); final == nil { + client.writeString(MessageInternalError) + td.log.Printf( + "termbinClient: %s error: gzipping data provided empty result", + client.RemoteAddr().String()) + return + } + } + resp, err := td.handler.Ingest(final) + if err != nil { + if resp == nil { + client.writeString(MessageInternalError) + } + td.log.Printf("termbinClient: %s error: %s", client.RemoteAddr().String(), err.Error()) + return + } + _, err = client.Write(resp) + if err != nil { + td.log.Printf("termbinClient: %s failed to deliver result: %s", client.RemoteAddr().String(), err.Error()) + } +} + +func (td *TermDumpster) handle(l net.Listener) { + defer l.Close() + for { + c, acceptErr := l.Accept() + if acceptErr != nil { + td.log.Printf("Error accepting connection: %s", acceptErr.Error()) + continue + } + go td.accept(c) + } +} + +// Listen starts the TCP server +func (td *TermDumpster) Listen(addr string, port string) error { + l, err := net.Listen("tcp", addr+":"+port) + if err != nil { + return err + } + td.handle(l) + return nil +} + +// ListenUnixSocket starts the unix socket listener +func (td *TermDumpster) ListenUnixSocket(path string) error { + unixAddr, err := net.ResolveUnixAddr("unix", path) + if err != nil { + return err + } + _ = syscall.Unlink(path) + mask := syscall.Umask(0o077) + unixListener, err := net.ListenUnix("unix", unixAddr) + syscall.Umask(mask) + if err != nil { + return err + } + td.handle(unixListener) + return nil +} diff --git a/putxt_test.go b/putxt_test.go new file mode 100644 index 0000000..c42802a --- /dev/null +++ b/putxt_test.go @@ -0,0 +1,75 @@ +package putxt + +// meant to act as a simple example + +import ( + "bytes" + "errors" + "net" + "strings" + "testing" + "time" + + "git.tcp.direct/kayos/common/entropy" + "git.tcp.direct/kayos/common/squish" +) + +type handler struct { + t *testing.T + needle []byte +} + +func (h *handler) Ingest(data []byte) ([]byte, error) { + var err error + data, err = squish.Gunzip(data) + if err != nil { + return nil, err + } + if strings.ReplaceAll(string(data), "\n", "") == string(h.needle) { + h.t.Log("got needle, echoing it back...") + return h.needle, nil + } + return []byte("invalid request"), errors.New("data does not match generated test needle: " + string(data)) +} + +func TestPutxt(t *testing.T) { + socketPath := t.TempDir() + "/putxt.sock" + testHandler := &handler{t: t, needle: []byte(entropy.RandStr(4096))} + td := NewTermDumpster(testHandler).WithGzip().WithMaxSize(3 << 20).WithTimeout(5 * time.Second) + var errChan = make(chan error) + go func() { + err := td.ListenUnixSocket(socketPath) + if err != nil { + errChan <- err + } + }() + select { + case err := <-errChan: + t.Fatalf("failed to listen on unix socket: %v", err.Error()) + default: + time.Sleep(10 * time.Millisecond) + c, err := net.Dial("unix", socketPath) + if err != nil { + t.Fatalf("failed to connect to unix socket: %v", err.Error()) + } + defer c.Close() + res := make(chan []byte) + go func() { + buf := make([]byte, 4096) + n, err := c.Read(buf) + if err != nil { + errChan <- err + } + res <- buf[:n] + }() + _, err = c.Write(testHandler.needle) + if err != nil { + t.Fatalf("failed to write to unix socket: %v", err.Error()) + } + + buf := <-res + if !bytes.Equal(buf, testHandler.needle) { + t.Fatalf("expected %s, got %s", testHandler.needle, string(buf)) + } + } +}