Fix Subscribe() deadlock

This commit is contained in:
James Mills 2022-04-04 09:06:51 +10:00
parent 6bfb669347
commit 284567a3b6
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6
2 changed files with 55 additions and 6 deletions

View File

@ -4,6 +4,7 @@ import (
"compress/flate"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
@ -36,6 +37,12 @@ const (
pingPeriod = (pongWait * 9) / 10
)
var (
// ErrSubscriberBufferFull is logged in Subscribe() when a subscriber's
// buffer is full and messages can no longer be enqueued for delivery
ErrSubscriberBufferFull = errors.New("error: subscriber buffer full")
)
// TODO: Make this configurable?
var upgrader = websocket.Upgrader{
ReadBufferSize: 4096,
@ -586,27 +593,41 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
if o.Index > 0 && o.Index <= t.Sequence {
var n int
log.Debugf("subscriber wants to start from %d", o.Index)
q.ForEach(func(item interface{}) error {
err := q.ForEach(func(item interface{}) error {
if msg, ok := item.(Message); ok && msg.ID >= o.Index {
log.Debugf("found msg #%d", msg.ID)
ch <- msg
n++
select {
case ch <- msg:
n++
default:
return ErrSubscriberBufferFull
}
}
return nil
})
if err != nil {
log.WithError(err).Error("error publishing messages to new subscriber")
}
log.Debugf("published %d messages", n)
} else {
// ELSE start from the beginning (invalid Index or Topic was reset)
// NB: This should not happen with write-ahead-logs (WAL)
var n int
log.Debugf("subscriber wanted to start from invalid %d (topic is at %d)", o.Index, t.Sequence)
q.ForEach(func(item interface{}) error {
err := q.ForEach(func(item interface{}) error {
if msg, ok := item.(Message); ok {
ch <- msg
n++
select {
case ch <- msg:
n++
default:
return ErrSubscriberBufferFull
}
}
return nil
})
if err != nil {
log.WithError(err).Error("error publishing messages to new subscriber")
}
log.Debugf("published %d messages", n)
}

View File

@ -146,6 +146,34 @@ func TestMessageBusSubscribe(t *testing.T) {
assert.Equal(expected, actual)
}
func TestMessageBusSubscribeFullBuffer(t *testing.T) {
//assert := assert.New(t)
require := require.New(t)
testdir, err := ioutil.TempDir("", "msgbus-logs-*")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(
WithBufferLength(2),
WithLogPath(testdir),
)
require.NoError(err)
topic := mb.NewTopic("hello")
mb.Put(mb.NewMessage(topic, []byte("foo"))) // ID == 1
mb.Put(mb.NewMessage(topic, []byte("bar"))) // ID == 2
mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 3
msgs := mb.Subscribe("id1", "hello")
go func() {
<-msgs
}()
err = mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 3
require.NoError(err)
}
func TestMessageBusSubscribeWithIndex(t *testing.T) {
assert := assert.New(t)
require := require.New(t)