Pretty much complete rewrite

This commit is contained in:
kayos@tcp.direct 2022-07-17 22:22:34 -07:00
parent 5bc828145c
commit 85b8a738f4
Signed by: kayos
GPG Key ID: 4B841471B4BEE979
4 changed files with 123 additions and 196 deletions

View File

@ -1,72 +1,36 @@
package main
// meant to act as a simple example
import (
"github.com/rs/zerolog/log"
"errors"
"strings"
"time"
"git.tcp.direct/kayos/common/squish"
termbin "git.tcp.direct/kayos/putxt"
)
func init() {
termbin.UseChannel = true
}
type handler struct{}
func incoming() {
var (
msg termbin.Message
deflated []byte
err error
)
select {
case msg = <-termbin.Msg:
switch msg.Type {
case termbin.Error:
log.Error().
Str("RemoteAddr", msg.RAddr).
Int("Size", msg.Size).
Msg(msg.Content)
case termbin.IncomingData:
log.Debug().
Str("RemoteAddr", msg.RAddr).
Int("Size", msg.Size).
Msg("INCOMING_DATA")
case termbin.Debug:
log.Debug().
Str("RemoteAddr", msg.RAddr).
Int("Size", msg.Size).
Msg(msg.Content)
case termbin.Final:
log.Info().
Str("RemoteAddr", msg.RAddr).
Int("Size", msg.Size).
Msg(msg.Content)
if termbin.Gzip {
if deflated, err = termbin.Deflate(msg.Bytes); err != nil {
log.Error().Err(err).Msg("DEFLATE_ERROR")
}
println(string(deflated))
} else {
println(string(msg.Bytes))
}
case termbin.Finish:
break
default:
log.Fatal().Msg("invalid message")
}
func (h *handler) Ingest(data []byte) ([]byte, error) {
var err error
data, err = squish.Gunzip(data)
if err != nil {
return nil, err
}
if strings.ReplaceAll(string(data), "\n", "") == "ping" {
println("got ping, sending pong...")
return []byte("pong"), nil
}
println(string(data))
return []byte("invalid request"), errors.New("invalid data")
}
func main() {
if termbin.UseChannel {
go func() {
for {
incoming()
}
}()
}
err := termbin.Listen("127.0.0.1", "8888")
td := termbin.NewTermDumpster(&handler{}).WithGzip().WithMaxSize(3 << 20).WithTimeout(5 * time.Second)
err := td.Listen("127.0.0.1", "8888")
if err != nil {
println(err.Error())
return

1
go.mod
View File

@ -3,6 +3,7 @@ module git.tcp.direct/kayos/putxt
go 1.18
require (
git.tcp.direct/kayos/common v0.7.0
github.com/rs/zerolog v1.27.0
github.com/yunginnanet/Rate5 v1.0.1
golang.org/x/tools v0.1.11

5
go.sum
View File

@ -1,3 +1,5 @@
git.tcp.direct/kayos/common v0.7.0 h1:KZDwoCzUiwQaYSWESr080N8wUVyLD27QYgzXgc7LiAQ=
git.tcp.direct/kayos/common v0.7.0/go.mod h1:7tMZBVNPLFSZk+JXTA6pgXWpf/XHqYRfT7Q3OziI++Y=
github.com/coreos/go-systemd/v22 v22.3.3-0.20220203105225-a9a7ef127534/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/dvyukov/go-fuzz v0.0.0-20210103155950-6a8e9d1f2415/go.mod h1:11Gm+ccJnvAhCNLlf5+cS9KjtbaD5I5zaZpFMsTHWTw=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
@ -7,6 +9,7 @@ github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.27.0 h1:1T7qCieN22GVc8S4Q2yuexzBb1EqjbgjSH9RohbMjKs=
@ -22,6 +25,7 @@ go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760/go.mod h1:
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220513210258-46612604a0f9 h1:NUzdAbFtCJSXU20AOXgeqaUwg8Ypg4MPYmL+d+rsB5c=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -48,3 +52,4 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
inet.af/netaddr v0.0.0-20220617031823-097006376321 h1:B4dC8ySKTQXasnjDTMsoCMf1sQG4WsMej0WXaHxunmU=
inet.af/netaddr v0.0.0-20220617031823-097006376321/go.mod h1:OIezDfdzOgFhuw4HuWapWq2e9l0H9tK4F1j+ETRtF3k=
nullprogram.com/x/rng v1.1.0 h1:SMU7DHaQSWtKJNTpNFIFt8Wd/KSmOuSDPXrMFp/UMro=

235
main.go
View File

@ -1,200 +1,157 @@
package termbin
import (
"bytes"
"net"
"strconv"
"sync"
"time"
"github.com/yunginnanet/Rate5"
"golang.org/x/tools/godoc/util"
ipa "inet.af/netaddr"
"inet.af/netaddr"
)
var (
// Msg is a channel for status information during concurrent server operations
Msg chan Message
// Reply is a channel to receive messages to send our client upon completion
Reply chan Message
// Timeout is the read timeout for incoming data in seconds
Timeout = 4
// MaxSize for incoming data (default: 3 << 30 or 3 MiB)
MaxSize = 3 << 20
// Gzip is our toggle for enabling or disabling gzip compression
Gzip = true
// UseChannel enables or disables sending status messages through an exported channel, otherwise debug messages will be printed.
UseChannel = true
// Rater is a ratelimiter powered by Rate5.
Rater *rate5.Limiter
)
type MessageType uint8
const (
Error MessageType = iota
IncomingData
NewConn
Finish
Debug
Final
ReturnURL
ReturnError
)
// Message is a struct that encapsulates status messages to send down the Msg channel
type Message struct {
Type MessageType
RAddr string
Content string
Bytes []byte
Size int
type TermDumpster struct {
gzip bool
maxSize int64
timeout time.Duration
handler Handler
*rate5.Limiter
*sync.Pool
}
func MsgTypeToStr(m MessageType) (s string) {
switch m {
case Error:
s = "ERROR"
case IncomingData:
s = "INCOMING_DATA"
case Finish:
s = "FINISH"
case Debug:
s = "DEBUG"
case Final:
s = "FINAL"
case NewConn:
s = "NEW_CONN"
default:
s = "INVALID"
type Handler interface {
Ingest(data []byte) ([]byte, error)
}
func NewTermDumpster(handler Handler) *TermDumpster {
td := &TermDumpster{
maxSize: 3 << 20,
timeout: 5 * time.Second,
Limiter: rate5.NewStrictLimiter(60, 5),
handler: handler,
}
return
}
// Identity is used for rate5's Identity implementation.
type Identity struct {
Actual net.IP
Addr net.Addr
}
func UseGzip(toggle bool) {
Gzip = toggle
}
// UniqueKey implements rate5's Identity interface.
func (t *Identity) UniqueKey() string {
if t.Addr != nil {
t.Actual = net.ParseIP(ipa.MustParseIPPort(t.Addr.String()).String())
td.Pool = &sync.Pool{
New: func() any { return new(bytes.Buffer) },
}
return t.Actual.String()
return td
}
func init() {
Msg = make(chan Message)
Reply = make(chan Message)
Rater = rate5.NewDefaultLimiter()
func (td *TermDumpster) WithGzip() *TermDumpster {
td.gzip = true
return td
}
func termStatus(m Message) {
if UseChannel {
Msg <- m
} else {
var suffix string
if m.Size != 0 {
suffix = " (" + strconv.Itoa(m.Size) + ")"
}
println("[" + m.RAddr + "][" + MsgTypeToStr(m.Type) + "] " + m.Content + suffix)
}
func (td *TermDumpster) WithMaxSize(size int64) *TermDumpster {
td.maxSize = size
return td
}
func serve(c net.Conn) {
termStatus(Message{Type: NewConn, RAddr: c.RemoteAddr().String()})
func (td *TermDumpster) WithTimeout(timeout time.Duration) *TermDumpster {
td.timeout = timeout
return td
}
type client struct {
addr string
net.Conn
}
func (c *client) UniqueKey() string {
return c.addr
}
func newClient(c net.Conn) *client {
cipp, _ := netaddr.ParseIPPort(c.RemoteAddr().String())
return &client{addr: cipp.IP().String(), Conn: c}
}
func (td *TermDumpster) accept(c net.Conn) {
var (
data []byte
final []byte
length int
done bool
length int64
)
if Rater.Check(&Identity{Addr: c.RemoteAddr()}) {
if _, err := c.Write([]byte("RATELIMIT_REACHED")); err != nil {
client := newClient(c)
if td.Check(client) {
if _, err := client.Write([]byte("RATELIMIT_REACHED")); err != nil {
println(err.Error())
}
c.Close()
termStatus(Message{Type: Error, RAddr: c.RemoteAddr().String(), Content: "RATELIMITED"})
client.Close()
// termStatus(Message{Type: Error, Remote: client.RemoteAddr().String(), Content: "RATELIMITED"})
return
}
buf := td.Pool.Get().(*bytes.Buffer)
defer func() {
_ = client.Close()
buf.Reset()
td.Put(buf)
}()
readLoop:
for {
if err := c.SetReadDeadline(time.Now().Add(time.Duration(Timeout) * time.Second)); err != nil {
if err := client.SetReadDeadline(time.Now().Add(td.timeout)); err != nil {
println(err.Error())
}
buf := make([]byte, MaxSize)
n, err := c.Read(buf)
n, err := buf.ReadFrom(client)
if err != nil {
switch err.Error() {
case "EOF":
c.Close()
termStatus(Message{Type: Error, RAddr: c.RemoteAddr().String(), Content: "EOF"})
// termStatus(td.msg(EOF)
return
case "read tcp " + c.LocalAddr().String() + "->" + c.RemoteAddr().String() + ": i/o timeout":
termStatus(Message{Type: Finish, RAddr: c.RemoteAddr().String(), Size: length, Content: "TIMEOUT"})
done = true
case "read tcp " + client.LocalAddr().String() + "->" + client.RemoteAddr().String() + ": i/o timeout":
// termStatus(Message{Type: Finish, Size: length, Content: "TIMEOUT"})
break readLoop
default:
c.Close()
termStatus(Message{Type: Error, Size: length, Content: err.Error()})
// termStatus(Message{Type: Error, Size: length, Content: err.Error()})
return
}
}
if done {
break
}
length += n
if length > MaxSize {
termStatus(Message{Type: Error, RAddr: c.RemoteAddr().String(), Size: length, Content: "MAX_SIZE_EXCEEDED"})
if _, err := c.Write([]byte("MAX_SIZE_EXCEEDED")); err != nil {
if length > td.maxSize {
// termStatus(Message{Type: Error, Remote: client.RemoteAddr().String(), Size: length, Content: "MAX_SIZE_EXCEEDED"})
if _, err := client.Write([]byte("MAX_SIZE_EXCEEDED")); err != nil {
println(err.Error())
}
c.Close()
return
}
data = append(data, buf[:n]...)
termStatus(Message{Type: IncomingData, RAddr: c.RemoteAddr().String(), Size: length})
// termStatus(Message{Type: IncomingData, Remote: client.RemoteAddr().String(), Size: length})
}
if !util.IsText(data) {
termStatus(Message{Type: Error, RAddr: c.RemoteAddr().String(), Content: "BINARY_DATA_REJECTED"})
if _, err := c.Write([]byte("BINARY_DATA_REJECTED")); err != nil {
if !util.IsText(buf.Bytes()) {
// termStatus(Message{Type: Error, Remote: client.RemoteAddr().String(), Content: "BINARY_DATA_REJECTED"})
if _, err := client.Write([]byte("BINARY_DATA_REJECTED")); err != nil {
println(err.Error())
}
c.Close()
return
}
final = data
if Gzip {
if td.gzip {
var gzerr error
if final, gzerr = gzipCompress(data); gzerr == nil {
diff := len(data) - len(final)
termStatus(Message{Type: Debug, RAddr: c.RemoteAddr().String(), Content: "GZIP_RESULT", Size: diff})
if final, gzerr = gzipCompress(buf.Bytes()); gzerr == nil {
// diff := len(buf.Bytes()) - len(final)
// termStatus(Message{Type: Debug, Remote: client.RemoteAddr().String(), Content: "GZIP_RESULT", Size: diff})
} else {
termStatus(Message{Type: Error, RAddr: c.RemoteAddr().String(), Content: "GZIP_ERROR: " + gzerr.Error()})
final = buf.Bytes()
// termStatus(Message{Type: Error, Remote: client.RemoteAddr().String(), Content: "GZIP_ERROR: " + gzerr.Error()})
}
}
termStatus(Message{Type: Final, RAddr: c.RemoteAddr().String(), Size: len(final), Bytes: final, Content: "SUCCESS"})
url := <-Reply
switch url.Type {
case ReturnURL:
c.Write([]byte(url.Content))
case ReturnError:
c.Write([]byte("ERROR: " + url.Content))
default:
//
// termStatus(Message{Type: Final, Remote: client.RemoteAddr().String(), Size: len(final), Data: final, Content: "SUCCESS"})
resp, err := td.handler.Ingest(final)
if err != nil {
// termStatus(Message{Type: Error, Remote: client.RemoteAddr().String(), Content: err.Error()})
if resp == nil {
_, _ = client.Write([]byte("INTERNAL_ERROR"))
}
println(err.Error())
}
c.Close()
client.Write(resp)
}
// Listen starts the TCP server
func Listen(addr string, port string) error {
func (td *TermDumpster) Listen(addr string, port string) error {
l, err := net.Listen("tcp", addr+":"+port)
if err != nil {
return err
@ -203,8 +160,8 @@ func Listen(addr string, port string) error {
for {
c, err := l.Accept()
if err != nil {
termStatus(Message{Type: Error, Content: err.Error()})
// termStatus(Message{Type: Error, Content: err.Error()})
}
go serve(c)
go td.accept(c)
}
}