staging(WAL): initial WAL support

This commit is contained in:
Nate 2022-04-02 23:41:57 -04:00
джерело 95505d5e2b
коміт a6d8bafa03
6 змінених файлів з 169 додано та 7 видалено

@ -32,6 +32,7 @@ var (
bufferLength int
maxQueueSize int
maxPayloadSize int
useWAL bool
)
func init() {
@ -44,6 +45,7 @@ func init() {
flag.BoolVarP(&debug, "debug", "d", false, "enable debug logging")
flag.StringVarP(&bind, "bind", "b", "0.0.0.0:8000", "[int]:<port> to bind to")
flag.BoolVarP(&useWAL, "wal", "w", false, "enable WAL")
flag.BoolVarP(&version, "version", "v", false, "display version information")
// Basic options
@ -114,6 +116,7 @@ func main() {
MaxQueueSize: maxQueueSize,
MaxPayloadSize: maxPayloadSize,
WithMetrics: true,
WithWAL: useWAL,
}
mb := msgbus.New(&opts)

17
go.mod

@ -27,8 +27,6 @@ require (
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.3 // indirect
github.com/kr/pretty v0.2.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
@ -37,14 +35,23 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 // indirect
github.com/tidwall/gjson v1.10.2 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/tinylru v1.1.0 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
require (
github.com/magiconair/properties v1.8.6 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/tidwall/wal v1.1.7
golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8 // indirect
gopkg.in/ini.v1 v1.66.4 // indirect
)

13
go.sum

@ -190,9 +190,8 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
@ -282,6 +281,16 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tidwall/gjson v1.10.2 h1:APbLGOM0rrEkd8WBw9C24nllro4ajFuJu0Sc9hRz8Bo=
github.com/tidwall/gjson v1.10.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/tinylru v1.1.0 h1:XY6IUfzVTU9rpwdhKUF6nQdChgCdGjkMfLzbWyiau6I=
github.com/tidwall/tinylru v1.1.0/go.mod h1:3+bX+TJ2baOLMWTnlyNWHh4QMnFyARg2TLTQ6OFbzw8=
github.com/tidwall/wal v1.1.7 h1:emc1TRjIVsdKKSnpwGBAcsAGg0767SvUk8+ygx7Bb+4=
github.com/tidwall/wal v1.1.7/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=

@ -200,7 +200,9 @@ type Options struct {
BufferLength int
MaxQueueSize int
MaxPayloadSize int
WALpath string
WithMetrics bool
WithWAL bool
}
// MessageBus ...
@ -213,6 +215,11 @@ type MessageBus struct {
maxQueueSize int
maxPayloadSize int
WALer *WALMgr
useWAL bool
options *Options
topics map[string]*Topic
queues map[*Topic]*Queue
subscribers map[*Topic]*Subscribers
@ -224,19 +231,25 @@ func New(options *Options) *MessageBus {
bufferLength int
maxQueueSize int
maxPayloadSize int
WALpath string
withMetrics bool
withWAL bool
)
if options != nil {
bufferLength = options.BufferLength
maxQueueSize = options.MaxQueueSize
maxPayloadSize = options.MaxPayloadSize
WALpath = WALpath
withMetrics = options.WithMetrics
withWAL = options.WithWAL
} else {
bufferLength = DefaultBufferLength
maxQueueSize = DefaultMaxQueueSize
maxPayloadSize = DefaultMaxPayloadSize
WALpath = "."
withMetrics = false
withWAL = false
}
var metrics *Metrics
@ -325,6 +338,8 @@ func New(options *Options) *MessageBus {
)
}
var waler WALMgr
return &MessageBus{
metrics: metrics,
@ -332,6 +347,11 @@ func New(options *Options) *MessageBus {
maxQueueSize: maxQueueSize,
maxPayloadSize: maxPayloadSize,
WALer: &waler,
useWAL: withWAL,
options: options,
topics: make(map[string]*Topic),
queues: make(map[*Topic]*Queue),
subscribers: make(map[*Topic]*Subscribers),

113
wal.go Normal file

@ -0,0 +1,113 @@
package msgbus
import (
"encoding/json"
"log"
"path/filepath"
"github.com/tidwall/wal"
)
type WALMgr struct {
Queue chan Message
}
func (w *WALMgr) Run(t *Topic, opts *Options) {
go func() {
w.Queue = make(chan Message)
l, err := wal.Open(filepath.Join(opts.WALpath, t.Name+".wal"), wal.DefaultOptions)
if err != nil {
log.Println(err)
return
}
// Get the first index
fi, err := l.FirstIndex()
if err != nil {
log.Println(err)
return
}
var ci uint64
for {
msg := <- w.Queue
// Marshal JSON
data, err := json.Marshal(&msg)
if err != nil {
log.Println(err)
continue
}
// Write message to WAL
if err = l.Write(ci, data); err != nil {
log.Println(err)
continue
}
// If exceeding max message queue, truncate as necessary in order that
// we never exceed the max message queue.
ci++
if (ci - fi) > uint64(opts.MaxQueueSize) {
if err = l.TruncateFront(ci-uint64(opts.MaxQueueSize)); err != nil {
log.Println(err)
continue
}
}
// Update the first index
fi = ci-uint64(opts.MaxQueueSize)
}
}()
}
func ReadWAL(t *Topic, opts *Options) (msgs []Message, err error) {
// Open the log file
l, err := wal.Open(filepath.Join(opts.WALpath, t.Name+".wal"), wal.DefaultOptions)
if err != nil {
return
}
// Determine first index
fi, err := l.FirstIndex()
if err != nil {
return
}
// If there are no entries, return
if fi == 0 {
return
}
// Determine the last index
li, err := l.LastIndex()
if err != nil {
return
}
// Determine the amount of entries to read
var tr uint64
if (li - fi) > uint64(opts.MaxQueueSize) {
tr = uint64(opts.MaxQueueSize)
} else {
tr = li - fi
}
var dat []byte
var msg Message
for i := fi; i < fi+tr; i++ {
dat, err = l.Read(i)
if err != nil {
return
}
err = json.Unmarshal(dat, &msg)
if err != nil {
return
}
msgs = append(msgs, msg)
}
return
}

10
wal_test.go Normal file

@ -0,0 +1,10 @@
package msgbus
import (
"testing"
)
// TODO(Nate)
func TestReadWAL(t *testing.T) {
// ...
}