(I/R)PC: Change message API

This commit is contained in:
kayos@tcp.direct 2022-01-21 04:04:03 -08:00
parent afbe5f3ca6
commit 75c23cc539
Signed by: kayos
GPG Key ID: 4B841471B4BEE979
2 changed files with 61 additions and 22 deletions

@ -20,25 +20,23 @@ func incoming() {
select { select {
case msg = <-termbin.Msg: case msg = <-termbin.Msg:
switch msg.Type { switch msg.Type {
case "ERROR": case termbin.Error:
log.Error(). log.Error().
Str("RemoteAddr", msg.RAddr). Str("RemoteAddr", msg.RAddr).
Int("Size", msg.Size). Int("Size", msg.Size).
Msg(msg.Content) Msg(msg.Content)
case "INCOMING_DATA": case termbin.IncomingData:
log.Debug(). log.Debug().
Str("RemoteAddr", msg.RAddr). Str("RemoteAddr", msg.RAddr).
Int("Size", msg.Size). Int("Size", msg.Size).
Msg("INCOMING_DATA") Msg("INCOMING_DATA")
case "FINISH": case termbin.Debug:
fallthrough
case "DEBUG":
log.Debug(). log.Debug().
Str("RemoteAddr", msg.RAddr). Str("RemoteAddr", msg.RAddr).
Int("Size", msg.Size). Int("Size", msg.Size).
Msg(msg.Content) Msg(msg.Content)
case "FINAL": case termbin.Final:
log.Info(). log.Info().
Str("RemoteAddr", msg.RAddr). Str("RemoteAddr", msg.RAddr).
Int("Size", msg.Size). Int("Size", msg.Size).
@ -51,6 +49,10 @@ func incoming() {
} else { } else {
println(string(msg.Bytes)) println(string(msg.Bytes))
} }
case termbin.Finish:
break
default:
log.Fatal().Msg("invalid message")
} }
} }
} }

69
main.go

@ -27,21 +27,58 @@ var (
Rater *rate5.Limiter Rater *rate5.Limiter
) )
type MessageType uint8
const (
Error MessageType = iota
IncomingData
NewConn
Finish
Debug
Final
ReturnURL
ReturnError
)
// Message is a struct that encapsulates status messages to send down the Msg channel // Message is a struct that encapsulates status messages to send down the Msg channel
type Message struct { type Message struct {
Type string Type MessageType
RAddr string RAddr string
Content string Content string
Bytes []byte Bytes []byte
Size int Size int
} }
func MsgTypeToStr(m MessageType) (s string) {
switch m {
case Error:
s = "ERROR"
case IncomingData:
s = "INCOMING_DATA"
case Finish:
s = "FINISH"
case Debug:
s = "DEBUG"
case Final:
s = "FINAL"
case NewConn:
s = "NEW_CONN"
default:
s = "INVALID"
}
return
}
// Identity is used for rate5's Identity implementation. // Identity is used for rate5's Identity implementation.
type Identity struct { type Identity struct {
Actual net.IP Actual net.IP
Addr net.Addr Addr net.Addr
} }
func UseGzip(toggle bool) {
Gzip = toggle
}
// UniqueKey implements rate5's Identity interface. // UniqueKey implements rate5's Identity interface.
func (t *Identity) UniqueKey() string { func (t *Identity) UniqueKey() string {
if t.Addr != nil { if t.Addr != nil {
@ -64,12 +101,12 @@ func termStatus(m Message) {
if m.Size != 0 { if m.Size != 0 {
suffix = " (" + strconv.Itoa(m.Size) + ")" suffix = " (" + strconv.Itoa(m.Size) + ")"
} }
println("[" + m.RAddr + "][" + m.Type + "] " + m.Content + suffix) println("[" + m.RAddr + "][" + MsgTypeToStr(m.Type) + "] " + m.Content + suffix)
} }
} }
func serve(c net.Conn) { func serve(c net.Conn) {
termStatus(Message{Type: "NEW_CONN", RAddr: c.RemoteAddr().String()}) termStatus(Message{Type: NewConn, RAddr: c.RemoteAddr().String()})
var ( var (
data []byte data []byte
final []byte final []byte
@ -82,7 +119,7 @@ func serve(c net.Conn) {
println(err.Error()) println(err.Error())
} }
c.Close() c.Close()
termStatus(Message{Type: "ERROR", RAddr: c.RemoteAddr().String(), Content: "RATELIMITED"}) termStatus(Message{Type: Error, RAddr: c.RemoteAddr().String(), Content: "RATELIMITED"})
return return
} }
@ -96,14 +133,14 @@ func serve(c net.Conn) {
switch err.Error() { switch err.Error() {
case "EOF": case "EOF":
c.Close() c.Close()
termStatus(Message{Type: "ERROR", RAddr: c.RemoteAddr().String(), Content: "EOF"}) termStatus(Message{Type: Error, RAddr: c.RemoteAddr().String(), Content: "EOF"})
return return
case "read tcp " + c.LocalAddr().String() + "->" + c.RemoteAddr().String() + ": i/o timeout": case "read tcp " + c.LocalAddr().String() + "->" + c.RemoteAddr().String() + ": i/o timeout":
termStatus(Message{Type: "FINISH", RAddr: c.RemoteAddr().String(), Size: length, Content: "TIMEOUT"}) termStatus(Message{Type: Finish, RAddr: c.RemoteAddr().String(), Size: length, Content: "TIMEOUT"})
done = true done = true
default: default:
c.Close() c.Close()
termStatus(Message{Type: "ERROR", Size: length, Content: err.Error()}) termStatus(Message{Type: Error, Size: length, Content: err.Error()})
return return
} }
} }
@ -112,7 +149,7 @@ func serve(c net.Conn) {
} }
length += n length += n
if length > MaxSize { if length > MaxSize {
termStatus(Message{Type: "ERROR", RAddr: c.RemoteAddr().String(), Size: length, Content: "MAX_SIZE_EXCEEDED"}) termStatus(Message{Type: Error, RAddr: c.RemoteAddr().String(), Size: length, Content: "MAX_SIZE_EXCEEDED"})
if _, err := c.Write([]byte("MAX_SIZE_EXCEEDED")); err != nil { if _, err := c.Write([]byte("MAX_SIZE_EXCEEDED")); err != nil {
println(err.Error()) println(err.Error())
} }
@ -120,10 +157,10 @@ func serve(c net.Conn) {
return return
} }
data = append(data, buf[:n]...) data = append(data, buf[:n]...)
termStatus(Message{Type: "INCOMING_DATA", RAddr: c.RemoteAddr().String(), Size: length}) termStatus(Message{Type: IncomingData, RAddr: c.RemoteAddr().String(), Size: length})
} }
if !util.IsText(data) { if !util.IsText(data) {
termStatus(Message{Type: "ERROR", RAddr: c.RemoteAddr().String(), Content: "BINARY_DATA_REJECTED"}) termStatus(Message{Type: Error, RAddr: c.RemoteAddr().String(), Content: "BINARY_DATA_REJECTED"})
if _, err := c.Write([]byte("BINARY_DATA_REJECTED")); err != nil { if _, err := c.Write([]byte("BINARY_DATA_REJECTED")); err != nil {
println(err.Error()) println(err.Error())
} }
@ -137,18 +174,18 @@ func serve(c net.Conn) {
var gzerr error var gzerr error
if final, gzerr = gzipCompress(data); gzerr == nil { if final, gzerr = gzipCompress(data); gzerr == nil {
diff := len(data) - len(final) diff := len(data) - len(final)
termStatus(Message{Type: "DEBUG", RAddr: c.RemoteAddr().String(), Content: "GZIP_RESULT", Size: diff}) termStatus(Message{Type: Debug, RAddr: c.RemoteAddr().String(), Content: "GZIP_RESULT", Size: diff})
} else { } else {
termStatus(Message{Type: "ERROR", RAddr: c.RemoteAddr().String(), Content: "GZIP_ERROR: " + gzerr.Error()}) termStatus(Message{Type: Error, RAddr: c.RemoteAddr().String(), Content: "GZIP_ERROR: " + gzerr.Error()})
} }
} }
termStatus(Message{Type: "FINAL", RAddr: c.RemoteAddr().String(), Size: len(final), Bytes: final, Content: "SUCCESS"}) termStatus(Message{Type: Final, RAddr: c.RemoteAddr().String(), Size: len(final), Bytes: final, Content: "SUCCESS"})
url := <-Reply url := <-Reply
switch url.Type { switch url.Type {
case "URL": case ReturnURL:
c.Write([]byte(url.Content)) c.Write([]byte(url.Content))
case "ERROR": case ReturnError:
c.Write([]byte("ERROR: " + url.Content)) c.Write([]byte("ERROR: " + url.Content))
default: default:
// //
@ -166,7 +203,7 @@ func Listen(addr string, port string) error {
for { for {
c, err := l.Accept() c, err := l.Accept()
if err != nil { if err != nil {
termStatus(Message{Type: "ERROR", Content: err.Error()}) termStatus(Message{Type: Error, Content: err.Error()})
} }
go serve(c) go serve(c)
} }