Refactor configuration with functional-style options

This commit is contained in:
James Mills 2022-04-03 15:16:31 +10:00
parent 95505d5e2b
commit 5ba388849b
Non sono state trovate chiavi note per questa firma nel database
ID Chiave GPG: AC4C014F1440EBD6
6 ha cambiato i file con 253 aggiunte e 94 eliminazioni

Vedi File

@ -5,22 +5,25 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"git.mills.io/prologic/msgbus"
)
func TestClientPublish(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb := msgbus.New(nil)
mb, err := msgbus.NewMessageBus()
require.NoError(err)
server := httptest.NewServer(mb)
defer server.Close()
client := NewClient(server.URL, nil)
err := client.Publish("hello", "hello world")
assert.NoError(err)
err = client.Publish("hello", "hello world")
require.NoError(err)
topic := mb.NewTopic("hello")
expected := msgbus.Message{Topic: topic, Payload: []byte("hello world")}

Vedi File

@ -27,7 +27,9 @@ Valid optinos:
var (
version bool
debug bool
bind string
logPath string
bufferLength int
maxQueueSize int
@ -43,10 +45,17 @@ func init() {
}
flag.BoolVarP(&debug, "debug", "d", false, "enable debug logging")
flag.StringVarP(&bind, "bind", "b", "0.0.0.0:8000", "[int]:<port> to bind to")
flag.BoolVarP(&version, "version", "v", false, "display version information")
// Basic options
flag.StringVarP(
&bind, "bind", "b", msgbus.DefaultBind,
"[int]:<port> to bind to",
)
flag.StringVarP(
&logPath, "log-path", "l", msgbus.DefaultLogPath,
"path to write log files to (wal)",
)
flag.IntVarP(
&bufferLength, "buffer-length", "B", msgbus.DefaultBufferLength,
"set the buffer length for subscribers before messages are dropped",
@ -87,6 +96,8 @@ func corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "*")
w.Header().Set("Access-Control-Allow-Headers", "*")
w.Header().Set("Access-Control-Expose-Headers", "*")
next.ServeHTTP(w, r)
})
}
@ -109,13 +120,16 @@ func main() {
go professor.Launch(":6060")
}
opts := msgbus.Options{
BufferLength: bufferLength,
MaxQueueSize: maxQueueSize,
MaxPayloadSize: maxPayloadSize,
WithMetrics: true,
mb, err := msgbus.NewMessageBus(
msgbus.WithLogPath(logPath),
msgbus.WithBufferLength(bufferLength),
msgbus.WithMaxQueueSize(maxQueueSize),
msgbus.WithMaxPayloadSize(maxPayloadSize),
msgbus.WithMetrics(true),
)
if err != nil {
log.WithError(err).Fatal("error configuring message bus")
}
mb := msgbus.New(&opts)
http.Handle("/", corsMiddleware(mb))
http.Handle("/metrics", mb.Metrics().Handler())

Vedi File

@ -7,7 +7,11 @@ import (
)
func main() {
m := msgbus.New(nil)
m, err := msgbus.NewMessageBus(nil)
if err != nil {
log.Fatal(err)
}
t := m.NewTopic("foo")
m.Put(m.NewMessage(t, []byte("Hello World!")))

Vedi File

@ -20,15 +20,6 @@ import (
)
const (
// DefaultMaxQueueSize is the default maximum size of queues
DefaultMaxQueueSize = 1024 // ~8MB per queue (1000 * 4KB)
// DefaultMaxPayloadSize is the default maximum payload size
DefaultMaxPayloadSize = 8192 // 8KB
// DefaultBufferLength is the default buffer length for subscriber chans
DefaultBufferLength = 256
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
@ -195,53 +186,31 @@ func (subs *Subscribers) NotifyAll(message Message) int {
return i
}
// Options ...
type Options struct {
BufferLength int
MaxQueueSize int
MaxPayloadSize int
WithMetrics bool
}
// MessageBus ...
type MessageBus struct {
sync.RWMutex
options *Options
metrics *Metrics
bufferLength int
maxQueueSize int
maxPayloadSize int
topics map[string]*Topic
queues map[*Topic]*Queue
subscribers map[*Topic]*Subscribers
}
// New ...
func New(options *Options) *MessageBus {
var (
bufferLength int
maxQueueSize int
maxPayloadSize int
withMetrics bool
)
// NewMessageBus creates a new message bus with the provided options
func NewMessageBus(opts ...Option) (*MessageBus, error) {
options := DefaultOptions
if options != nil {
bufferLength = options.BufferLength
maxQueueSize = options.MaxQueueSize
maxPayloadSize = options.MaxPayloadSize
withMetrics = options.WithMetrics
} else {
bufferLength = DefaultBufferLength
maxQueueSize = DefaultMaxQueueSize
maxPayloadSize = DefaultMaxPayloadSize
withMetrics = false
for _, opt := range opts {
if err := opt(options); err != nil {
return nil, err
}
}
var metrics *Metrics
if withMetrics {
if options.WithMetrics {
metrics = NewMetrics("msgbus")
ctime := time.Now()
@ -326,16 +295,13 @@ func New(options *Options) *MessageBus {
}
return &MessageBus{
options: options,
metrics: metrics,
bufferLength: bufferLength,
maxQueueSize: maxQueueSize,
maxPayloadSize: maxPayloadSize,
topics: make(map[string]*Topic),
queues: make(map[*Topic]*Queue),
subscribers: make(map[*Topic]*Subscribers),
}
}, nil
}
// Len ...
@ -389,7 +355,7 @@ func (mb *MessageBus) Put(message Message) {
t := message.Topic
q, ok := mb.queues[t]
if !ok {
q = NewQueue(mb.maxQueueSize)
q = NewQueue(mb.options.MaxQueueSize)
mb.queues[message.Topic] = q
}
q.Push(message)
@ -453,7 +419,7 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
subs, ok := mb.subscribers[t]
if !ok {
subs = NewSubscribers(&SubscriberConfig{BufferLength: mb.bufferLength})
subs = NewSubscribers(&SubscriberConfig{BufferLength: mb.options.BufferLength})
mb.subscribers[t] = subs
}
@ -570,7 +536,7 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case "POST", "PUT":
defer r.Body.Close()
if r.ContentLength > int64(mb.maxPayloadSize) {
if r.ContentLength > int64(mb.options.MaxPayloadSize) {
msg := "payload exceeds max-payload-size"
http.Error(w, msg, http.StatusRequestEntityTooLarge)
return
@ -607,7 +573,7 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, msg, http.StatusBadRequest)
return
}
if len(body) > mb.maxPayloadSize {
if len(body) > mb.options.MaxPayloadSize {
msg := "payload exceeds max-payload-size"
http.Error(w, msg, http.StatusRequestEntityTooLarge)
return

Vedi File

@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"flag"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
@ -22,28 +23,40 @@ var (
)
func TestMessageBusLen(t *testing.T) {
mb := New(nil)
assert.Equal(t, mb.Len(), 0)
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
require.NoError(err)
assert.Equal(0, mb.Len())
}
func TestMessage(t *testing.T) {
mb := New(nil)
assert.Equal(t, mb.Len(), 0)
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
require.NoError(err)
assert.Equal(0, mb.Len())
topic := mb.NewTopic("foo")
expected := mb.NewMessage(topic, []byte("bar"))
mb.Put(expected)
actual, ok := mb.Get(topic)
assert.True(t, ok)
assert.Equal(t, actual, expected)
require.True(ok)
assert.Equal(expected, actual)
}
func TestMessageIds(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb := New(nil)
mb, err := NewMessageBus()
require.NoError(err)
assert.Equal(0, mb.Len())
topic := mb.NewTopic("foo")
@ -62,28 +75,42 @@ func TestMessageIds(t *testing.T) {
}
func TestMessageGetEmpty(t *testing.T) {
mb := New(nil)
assert.Equal(t, mb.Len(), 0)
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
require.NoError(err)
assert.Equal(0, mb.Len())
topic := mb.NewTopic("foo")
msg, ok := mb.Get(topic)
assert.False(t, ok)
assert.Equal(t, msg, Message{})
require.False(ok)
assert.Equal(Message{}, msg)
}
func TestMessageBusPutGet(t *testing.T) {
mb := New(nil)
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
require.NoError(err)
topic := mb.NewTopic("foo")
expected := mb.NewMessage(topic, []byte("foo"))
mb.Put(expected)
actual, ok := mb.Get(topic)
assert.True(t, ok)
assert.Equal(t, actual, expected)
require.True(ok)
assert.Equal(expected, actual)
}
func TestMessageBusSubscribe(t *testing.T) {
mb := New(nil)
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
require.NoError(err)
msgs := mb.Subscribe("id1", "foo")
@ -92,13 +119,47 @@ func TestMessageBusSubscribe(t *testing.T) {
mb.Put(expected)
actual := <-msgs
assert.Equal(t, actual, expected)
assert.Equal(expected, actual)
}
func TestMessageBusSubscribeWithIndex(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb := New(nil)
mb, err := NewMessageBus()
require.NoError(err)
msgs := mb.Subscribe("id1", "foo")
topic := mb.NewTopic("foo")
expected := mb.NewMessage(topic, []byte("foo"))
mb.Put(expected)
actual := <-msgs
assert.Equal(expected, actual)
assert.Equal(0, actual.ID)
mb.Unsubscribe("id1", "foo")
mb.Put(mb.NewMessage(topic, []byte("bar"))) // ID == 1
mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 2
msgs = mb.Subscribe("id1", "foo", WithIndex(1))
assert.Equal([]byte("bar"), (<-msgs).Payload)
assert.Equal([]byte("baz"), (<-msgs).Payload)
}
func TestMessageBusWAL(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
msgs := mb.Subscribe("id1", "foo")
@ -123,8 +184,11 @@ func TestMessageBusSubscribeWithIndex(t *testing.T) {
func TestServeHTTPGETIndexEmpty(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus(nil)
require.NoError(err)
mb := New(nil)
w := httptest.NewRecorder()
r, _ := http.NewRequest("GET", "/", nil)
@ -135,8 +199,10 @@ func TestServeHTTPGETIndexEmpty(t *testing.T) {
func TestServeHTTPGETTopics(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb := New(nil)
mb, err := NewMessageBus()
require.NoError(err)
mb.Put(mb.NewMessage(mb.NewTopic("foo"), []byte("foo")))
mb.Put(mb.NewMessage(mb.NewTopic("hello"), []byte("hello world")))
@ -152,8 +218,11 @@ func TestServeHTTPGETTopics(t *testing.T) {
func TestServeHTTPGETEmptyQueue(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
require.NoError(err)
mb := New(nil)
w := httptest.NewRecorder()
r, _ := http.NewRequest("GET", "/hello", nil)
@ -163,8 +232,11 @@ func TestServeHTTPGETEmptyQueue(t *testing.T) {
func TestServeHTTPPOST(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
require.NoError(err)
mb := New(nil)
w := httptest.NewRecorder()
b := bytes.NewBufferString("hello world")
r, _ := http.NewRequest("POST", "/hello", b)
@ -175,8 +247,11 @@ func TestServeHTTPPOST(t *testing.T) {
func TestServeHTTPMaxPayloadSize(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
require.NoError(err)
mb := New(nil)
w := httptest.NewRecorder()
b := bytes.NewBuffer(bytes.Repeat([]byte{'X'}, (DefaultMaxPayloadSize * 2)))
r, _ := http.NewRequest("POST", "/hello", b)
@ -188,8 +263,10 @@ func TestServeHTTPMaxPayloadSize(t *testing.T) {
func TestServeHTTPSimple(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb := New(nil)
mb, err := NewMessageBus()
require.NoError(err)
w := httptest.NewRecorder()
b := bytes.NewBufferString("hello world")
@ -212,7 +289,10 @@ func TestServeHTTPSimple(t *testing.T) {
}
func BenchmarkServeHTTPPOST(b *testing.B) {
mb := New(nil)
require := require.New(b)
mb, err := NewMessageBus()
require.NoError(err)
b.ResetTimer()
for i := 0; i < b.N; i++ {
@ -228,7 +308,8 @@ func TestServeHTTPSubscriber(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb := New(nil)
mb, err := NewMessageBus()
require.NoError(err)
s := httptest.NewServer(mb)
defer s.Close()
@ -272,7 +353,8 @@ func TestServeHTTPSubscriberReconnect(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb := New(nil)
mb, err := NewMessageBus()
require.NoError(err)
s := httptest.NewServer(mb)
@ -316,17 +398,20 @@ func TestServeHTTPSubscriberReconnect(t *testing.T) {
func TestMsgBusMetrics(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
opts := Options{
WithMetrics: true,
}
mb := New(&opts)
mb, err := NewMessageBus(WithMetrics(true))
require.NoError(err)
assert.IsType(&Metrics{}, mb.Metrics())
}
func BenchmarkMessageBusPut(b *testing.B) {
mb := New(nil)
require := require.New(b)
mb, err := NewMessageBus()
require.NoError(err)
topic := mb.NewTopic("foo")
msg := mb.NewMessage(topic, []byte("foo"))
b.ResetTimer()
@ -336,7 +421,11 @@ func BenchmarkMessageBusPut(b *testing.B) {
}
func BenchmarkMessageBusGet(b *testing.B) {
mb := New(nil)
require := require.New(b)
mb, err := NewMessageBus()
require.NoError(err)
topic := mb.NewTopic("foo")
msg := mb.NewMessage(topic, []byte("foo"))
for i := 0; i < b.N; i++ {
@ -349,7 +438,11 @@ func BenchmarkMessageBusGet(b *testing.B) {
}
func BenchmarkMessageBusGetEmpty(b *testing.B) {
mb := New(nil)
require := require.New(b)
mb, err := NewMessageBus()
require.NoError(err)
topic := mb.NewTopic("foo")
b.ResetTimer()
for i := 0; i < b.N; i++ {
@ -358,7 +451,11 @@ func BenchmarkMessageBusGetEmpty(b *testing.B) {
}
func BenchmarkMessageBusPutGet(b *testing.B) {
mb := New(nil)
require := require.New(b)
mb, err := NewMessageBus()
require.NoError(err)
topic := mb.NewTopic("foo")
msg := mb.NewMessage(topic, []byte("foo"))
b.ResetTimer()

75
options.go Normal file
Vedi File

@ -0,0 +1,75 @@
package msgbus
const (
// DefaultBind is the default bind address
DefaultBind = ":8000"
// DefaultLogPath is the default path to write logs to (wal)
DefaultLogPath = "./logs"
// DefaultMaxQueueSize is the default maximum size of queues
DefaultMaxQueueSize = 1024 // ~8MB per queue (1000 * 4KB)
// DefaultMaxPayloadSize is the default maximum payload size
DefaultMaxPayloadSize = 8192 // 8KB
// DefaultBufferLength is the default buffer length for subscriber chans
DefaultBufferLength = 256
// DefaultWithMetrics is the default for whether to enable metrics
DefaultWithMetrics = false
)
var DefaultOptions = &Options{
LogPath: DefaultLogPath,
BufferLength: DefaultBufferLength,
MaxQueueSize: DefaultMaxQueueSize,
MaxPayloadSize: DefaultMaxPayloadSize,
WithMetrics: DefaultWithMetrics,
}
// Options ...
type Options struct {
LogPath string
BufferLength int
MaxQueueSize int
MaxPayloadSize int
WithMetrics bool
}
type Option func(opts *Options) error
func WithLogPath(logPath string) Option {
return func(opts *Options) error {
opts.LogPath = logPath
return nil
}
}
func WithBufferLength(bufferLength int) Option {
return func(opts *Options) error {
opts.BufferLength = bufferLength
return nil
}
}
func WithMaxQueueSize(maxQueueSize int) Option {
return func(opts *Options) error {
opts.MaxQueueSize = maxQueueSize
return nil
}
}
func WithMaxPayloadSize(maxPayloadSize int) Option {
return func(opts *Options) error {
opts.MaxPayloadSize = maxPayloadSize
return nil
}
}
func WithMetrics(withMetrics bool) Option {
return func(opts *Options) error {
opts.WithMetrics = withMetrics
return nil
}
}