Fixes several performance issues found in profiling and load testing. (#12)
* Fixed drop rate for subscriber listeners by adding buffering to channels (configurable) * Added optinoal pprof support * Added profile target for running profiled benchmarks
This commit is contained in:
parent
61667a8521
commit
81e35b8e18
|
@ -1,6 +1,7 @@
|
|||
*~*
|
||||
dist
|
||||
*.bak
|
||||
*.prof
|
||||
coverage.txt
|
||||
cmd/msgbus/msgbus
|
||||
cmd/msgbusd/msgbusd
|
||||
|
|
2
Makefile
2
Makefile
|
@ -37,6 +37,8 @@ image:
|
|||
@docker build --build-arg TAG=$(TAG) --build-arg BUILD=$(BUILD) -t $(REPO):$(TAG) .
|
||||
@echo "Image created: $(REPO):$(TAG)"
|
||||
|
||||
profile:
|
||||
@go test -cpuprofile cpu.prof -memprofile mem.prof -v -bench=. $(TEST_ARGS)
|
||||
bench:
|
||||
@go test -v -bench=. $(TEST_ARGS)
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/mmcloughlin/professor"
|
||||
"github.com/prologic/msgbus"
|
||||
)
|
||||
|
||||
|
@ -16,6 +17,7 @@ func main() {
|
|||
version bool
|
||||
debug bool
|
||||
bind string
|
||||
bufferLength int
|
||||
maxQueueSize int
|
||||
maxPayloadSize int
|
||||
)
|
||||
|
@ -25,6 +27,7 @@ func main() {
|
|||
|
||||
flag.StringVar(&bind, "bind", ":8000", "interface and port to bind to")
|
||||
|
||||
flag.IntVar(&bufferLength, "buffer-length", msgbus.DefaultBufferLength, "buffer length")
|
||||
flag.IntVar(&maxQueueSize, "max-queue-size", msgbus.DefaultMaxQueueSize, "maximum queue size")
|
||||
flag.IntVar(&maxPayloadSize, "max-payload-size", msgbus.DefaultMaxPayloadSize, "maximum payload size")
|
||||
|
||||
|
@ -41,7 +44,12 @@ func main() {
|
|||
os.Exit(0)
|
||||
}
|
||||
|
||||
if debug {
|
||||
go professor.Launch(":6060")
|
||||
}
|
||||
|
||||
opts := msgbus.Options{
|
||||
BufferLength: bufferLength,
|
||||
MaxQueueSize: maxQueueSize,
|
||||
MaxPayloadSize: maxPayloadSize,
|
||||
WithMetrics: true,
|
||||
|
|
34
msgbus.go
34
msgbus.go
|
@ -22,6 +22,9 @@ const (
|
|||
// DefaultMaxPayloadSize is the default maximum payload size
|
||||
DefaultMaxPayloadSize = 4096 // 4KB
|
||||
|
||||
// DefaultBufferLength is the default buffer length for subscriber chans
|
||||
DefaultBufferLength = 10
|
||||
|
||||
// Time allowed to write a message to the peer.
|
||||
writeWait = 10 * time.Second
|
||||
|
||||
|
@ -59,17 +62,36 @@ type Message struct {
|
|||
Created time.Time `json:"created"`
|
||||
}
|
||||
|
||||
// ListenerOptions ...
|
||||
type ListenerOptions struct {
|
||||
BufferLength int
|
||||
}
|
||||
|
||||
// Listeners ...
|
||||
type Listeners struct {
|
||||
sync.RWMutex
|
||||
|
||||
buflen int
|
||||
|
||||
ids map[string]bool
|
||||
chs map[string]chan Message
|
||||
}
|
||||
|
||||
// NewListeners ...
|
||||
func NewListeners() *Listeners {
|
||||
func NewListeners(options *ListenerOptions) *Listeners {
|
||||
var (
|
||||
bufferLength int
|
||||
)
|
||||
|
||||
if options != nil {
|
||||
bufferLength = options.BufferLength
|
||||
} else {
|
||||
bufferLength = DefaultBufferLength
|
||||
}
|
||||
|
||||
return &Listeners{
|
||||
buflen: bufferLength,
|
||||
|
||||
ids: make(map[string]bool),
|
||||
chs: make(map[string]chan Message),
|
||||
}
|
||||
|
@ -89,7 +111,7 @@ func (ls *Listeners) Add(id string) chan Message {
|
|||
defer ls.Unlock()
|
||||
|
||||
ls.ids[id] = true
|
||||
ls.chs[id] = make(chan Message)
|
||||
ls.chs[id] = make(chan Message, ls.buflen)
|
||||
return ls.chs[id]
|
||||
}
|
||||
|
||||
|
@ -148,6 +170,7 @@ func (ls *Listeners) NotifyAll(message Message) int {
|
|||
|
||||
// Options ...
|
||||
type Options struct {
|
||||
BufferLength int
|
||||
MaxQueueSize int
|
||||
MaxPayloadSize int
|
||||
WithMetrics bool
|
||||
|
@ -159,6 +182,7 @@ type MessageBus struct {
|
|||
|
||||
metrics *Metrics
|
||||
|
||||
bufferLength int
|
||||
maxQueueSize int
|
||||
maxPayloadSize int
|
||||
|
||||
|
@ -170,16 +194,19 @@ type MessageBus struct {
|
|||
// New ...
|
||||
func New(options *Options) *MessageBus {
|
||||
var (
|
||||
bufferLength int
|
||||
maxQueueSize int
|
||||
maxPayloadSize int
|
||||
withMetrics bool
|
||||
)
|
||||
|
||||
if options != nil {
|
||||
bufferLength = options.BufferLength
|
||||
maxQueueSize = options.MaxQueueSize
|
||||
maxPayloadSize = options.MaxPayloadSize
|
||||
withMetrics = options.WithMetrics
|
||||
} else {
|
||||
bufferLength = DefaultBufferLength
|
||||
maxQueueSize = DefaultMaxQueueSize
|
||||
maxPayloadSize = DefaultMaxPayloadSize
|
||||
withMetrics = false
|
||||
|
@ -274,6 +301,7 @@ func New(options *Options) *MessageBus {
|
|||
return &MessageBus{
|
||||
metrics: metrics,
|
||||
|
||||
bufferLength: bufferLength,
|
||||
maxQueueSize: maxQueueSize,
|
||||
maxPayloadSize: maxPayloadSize,
|
||||
|
||||
|
@ -409,7 +437,7 @@ func (mb *MessageBus) Subscribe(id, topic string) chan Message {
|
|||
|
||||
ls, ok := mb.listeners[t]
|
||||
if !ok {
|
||||
ls = NewListeners()
|
||||
ls = NewListeners(&ListenerOptions{BufferLength: mb.bufferLength})
|
||||
mb.listeners[t] = ls
|
||||
}
|
||||
|
||||
|
|
|
@ -78,19 +78,6 @@ func TestServeHTTPPOST(t *testing.T) {
|
|||
assert.Regexp(`message successfully published to hello with sequence \d+`, w.Body.String())
|
||||
}
|
||||
|
||||
func TestServeHTTPPUT(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
mb := New(nil)
|
||||
w := httptest.NewRecorder()
|
||||
b := bytes.NewBufferString("hello world")
|
||||
r, _ := http.NewRequest("PUT", "/hello", b)
|
||||
|
||||
mb.ServeHTTP(w, r)
|
||||
assert.Equal(w.Code, http.StatusOK)
|
||||
assert.Regexp(`message successfully published to hello with sequence \d+`, w.Body.String())
|
||||
}
|
||||
|
||||
func TestServeHTTPMaxPayloadSize(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
|
@ -130,6 +117,19 @@ func TestServeHTTPSimple(t *testing.T) {
|
|||
assert.Equal(msg.Payload, []byte("hello world"))
|
||||
}
|
||||
|
||||
func BenchmarkServeHTTPPOST(b *testing.B) {
|
||||
mb := New(nil)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
w := httptest.NewRecorder()
|
||||
b := bytes.NewBufferString("hello world")
|
||||
r, _ := http.NewRequest("POST", "/hello", b)
|
||||
|
||||
mb.ServeHTTP(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServeHTTPSubscriber(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
|
|
Loading…
Reference in New Issue