Implement log writing side

This commit is contained in:
James Mills 2022-04-03 16:11:56 +10:00
부모 dc2d6b3224
커밋 274f42f3ad
No known key found for this signature in database
GPG 키 ID: AC4C014F1440EBD6
4개의 변경된 파일77개의 추가작업 그리고 16개의 파일을 삭제

6
go.mod
파일 보기

@ -4,6 +4,7 @@ go 1.18
require (
github.com/andybalholm/brotli v1.0.4
github.com/cyphar/filepath-securejoin v0.2.3
github.com/gorilla/websocket v1.5.0
github.com/jpillora/backoff v1.0.0
github.com/mitchellh/go-homedir v1.1.0
@ -15,6 +16,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.10.1
github.com/stretchr/testify v1.7.0
github.com/tidwall/wal v1.1.7
nhooyr.io/websocket v1.8.7
)
@ -41,6 +43,10 @@ require (
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/tidwall/gjson v1.10.2 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/tinylru v1.1.0 // indirect
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect

12
go.sum
파일 보기

@ -61,6 +61,8 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cpuguy83/go-md2man/v2 v2.0.1/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cyphar/filepath-securejoin v0.2.3 h1:YX6ebbZCZP7VkM3scTTokDgBL2TY741X51MTk3ycuNI=
github.com/cyphar/filepath-securejoin v0.2.3/go.mod h1:aPGpWjXOXUn2NCNjFvBE6aRxGGx79pTxQpKOJNYHHl4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -282,6 +284,16 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tidwall/gjson v1.10.2 h1:APbLGOM0rrEkd8WBw9C24nllro4ajFuJu0Sc9hRz8Bo=
github.com/tidwall/gjson v1.10.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/tinylru v1.1.0 h1:XY6IUfzVTU9rpwdhKUF6nQdChgCdGjkMfLzbWyiau6I=
github.com/tidwall/tinylru v1.1.0/go.mod h1:3+bX+TJ2baOLMWTnlyNWHh4QMnFyARg2TLTQ6OFbzw8=
github.com/tidwall/wal v1.1.7 h1:emc1TRjIVsdKKSnpwGBAcsAGg0767SvUk8+ygx7Bb+4=
github.com/tidwall/wal v1.1.7/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=

파일 보기

@ -13,8 +13,10 @@ import (
"time"
"github.com/andybalholm/brotli"
securejoin "github.com/cyphar/filepath-securejoin"
sync "github.com/sasha-s/go-deadlock"
log "github.com/sirupsen/logrus"
"github.com/tidwall/wal"
"github.com/gorilla/websocket"
)
@ -45,7 +47,7 @@ type HandlerFunc func(msg *Message) error
// Topic ...
type Topic struct {
Name string `json:"name"`
Sequence int `json:"seq"`
Sequence int64 `json:"seq"`
Created time.Time `json:"created"`
}
@ -55,22 +57,31 @@ func (t *Topic) String() string {
// Message ...
type Message struct {
ID int `json:"id"`
ID int64 `json:"id"`
Topic *Topic `json:"topic"`
Payload []byte `json:"payload"`
Created time.Time `json:"created"`
}
func LoadMessage(data []byte) (m *Message, err error) {
err = json.Unmarshal(data, &m)
return
}
func (m *Message) Bytes() ([]byte, error) {
return json.Marshal(m)
}
// SubscribeOption ...
type SubscribeOption func(*SubscriberOptions)
// SubscriberOptions ...
type SubscriberOptions struct {
Index int
Index int64
}
// WithIndex sets the index to start subscribing from
func WithIndex(index int) SubscribeOption {
func WithIndex(index int64) SubscribeOption {
return func(o *SubscriberOptions) { o.Index = index }
}
@ -193,8 +204,10 @@ type MessageBus struct {
options *Options
metrics *Metrics
topics map[string]*Topic
queues map[*Topic]*Queue
topics map[string]*Topic
queues map[*Topic]*Queue
logs map[*Topic]*wal.Log
subscribers map[*Topic]*Subscribers
}
@ -298,8 +311,10 @@ func NewMessageBus(opts ...Option) (*MessageBus, error) {
options: options,
metrics: metrics,
topics: make(map[string]*Topic),
queues: make(map[*Topic]*Queue),
topics: make(map[string]*Topic),
queues: make(map[*Topic]*Queue),
logs: make(map[*Topic]*wal.Log),
subscribers: make(map[*Topic]*Subscribers),
}, nil
}
@ -348,15 +363,41 @@ func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message {
}
// Put ...
func (mb *MessageBus) Put(message Message) {
func (mb *MessageBus) Put(message Message) error {
mb.Lock()
defer mb.Unlock()
t := message.Topic
l, ok := mb.logs[t]
if !ok {
fn, err := securejoin.SecureJoin(mb.options.LogPath, t.Name)
if err != nil {
return fmt.Errorf("error creating logfile filename for %s: %w", t.Name, err)
}
l, err := wal.Open(fn, nil)
if err != nil {
return fmt.Errorf("error opening logfile %s: %w", fn, err)
}
mb.logs[t] = l
}
id := uint64(message.ID)
data, err := message.Bytes()
if err != nil {
return fmt.Errorf("error serializing message %d: %w", id, err)
}
if err := l.Write(id, data); err != nil {
return fmt.Errorf("error writing message %d to logfile: %w", message.ID, err)
}
q, ok := mb.queues[t]
if !ok {
q = NewQueue(mb.options.MaxQueueSize)
mb.queues[message.Topic] = q
mb.queues[t] = q
}
q.Push(message)
@ -365,6 +406,8 @@ func (mb *MessageBus) Put(message Message) {
}
mb.publish(message)
return nil
}
// Get ...
@ -589,7 +632,7 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
i := SafeParseInt(r.URL.Query().Get("index"), -1)
i := SafeParseInt64(r.URL.Query().Get("index"), -1)
log.Debugf("new subscriber for %s from %s", t.Name, r.RemoteAddr)
@ -624,7 +667,7 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type Client struct {
conn *websocket.Conn
topic *Topic
index int
index int64
bus *MessageBus
id string
@ -632,7 +675,7 @@ type Client struct {
}
// NewClient ...
func NewClient(conn *websocket.Conn, topic *Topic, index int, bus *MessageBus) *Client {
func NewClient(conn *websocket.Conn, topic *Topic, index int64, bus *MessageBus) *Client {
return &Client{conn: conn, topic: topic, index: index, bus: bus}
}

파일 보기

@ -2,9 +2,9 @@ package msgbus
import "strconv"
// SafeParseInt ...
func SafeParseInt(s string, d int) int {
n, e := strconv.Atoi(s)
// SafeParseInt64 ...
func SafeParseInt64(s string, d int64) int64 {
n, e := strconv.ParseInt(s, 10, 64)
if e != nil {
return d
}