Fix Subscribe() deadlock (#34)

Fixes #139

This was caused by full subscriber buffers.

Co-authored-by: James Mills <prologic@shortcircuit.net.au>
Reviewed-on: https://git.mills.io/prologic/msgbus/pulls/34
This commit is contained in:
James Mills 2022-04-04 01:15:49 +00:00
джерело 6bfb669347
коміт 01ab56f9b3
9 змінених файлів з 121 додано та 28 видалено

@ -71,14 +71,14 @@ fmt:
@$(GOCMD) fmt ./...
test:
@CGO_ENABLED=1 $(GOCMD) test -v -cover -race ./...
@CGO_ENABLED=1 $(GOCMD) test -d -v -cover -race -timeout 30s ./...
coverage:
@CGO_ENABLED=1 $(GOCMD) test -v -cover -race -cover -coverprofile=coverage.out ./...
@CGO_ENABLED=1 $(GOCMD) test -d -v -cover -race -timeout 30s -cover -coverprofile=coverage.out ./...
@$(GOCMD) tool cover -html=coverage.out
bench:
@CGO_ENABLED=1 $(GOCMD) test -v -cover -race -cover -coverprofile=coverage.out ./...
@CGO_ENABLED=1 $(GOCMD) test -d -v -cover -race -timeout 30s -cover -coverprofile=coverage.out ./...
@CGO_ENABLED=1 $(GOCMD) test -race -benchtime=1x -cpu 16 -benchmem -bench "^(Benchmark)" .
clean:

@ -55,12 +55,12 @@ func init() {
RootCmd.AddCommand(subCmd)
subCmd.Flags().IntP(
"index", "i", -1,
"position in the topic's sequence to start subscribing from (-1 indicates end)",
"index", "i", 0,
"position in the topic's sequence to start subscribing from (0 indicates head)",
)
viper.BindPFlag("index", subCmd.Flags().Lookup("index"))
viper.SetDefault("index", -1)
viper.SetDefault("index", 0)
}
func handler(command string, args []string) msgbus.HandlerFunc {

BIN
examples/examples Executable file

Бінарний файл не відображається.

@ -7,7 +7,7 @@ import (
)
func main() {
m, err := msgbus.NewMessageBus(nil)
m, err := msgbus.NewMessageBus()
if err != nil {
log.Fatal(err)
}

@ -4,6 +4,7 @@ import (
"compress/flate"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
@ -36,6 +37,12 @@ const (
pingPeriod = (pongWait * 9) / 10
)
var (
// BufferFull is logged in Subscribe() when a subscriber's
// buffer is full and messages can no longer be enqueued for delivery
ErrBufferFull = errors.New("error: subscriber buffer full")
)
// TODO: Make this configurable?
var upgrader = websocket.Upgrader{
ReadBufferSize: 4096,
@ -194,7 +201,7 @@ func (subs *Subscribers) NotifyAll(message Message) int {
default:
// TODO: Drop this client?
// TODO: Retry later?
log.Warnf("cannot publish message to %s: %+v", id, message)
log.Warnf("cannot publish message %s#%d to %s", message.Topic.Name, message.ID, id)
}
}
@ -217,7 +224,7 @@ type MessageBus struct {
// NewMessageBus creates a new message bus with the provided options
func NewMessageBus(opts ...Option) (*MessageBus, error) {
options := DefaultOptions
options := NewDefaultOptions()
for _, opt := range opts {
if err := opt(options); err != nil {
@ -581,32 +588,54 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
return ch
}
if o.Index == 0 {
// Index == 0 indicates to start from head
// so just return the channel
return ch
}
// IF client requests to start from index >= 0 (-1 from the head)
// AND the topic's sequence number hasn't been reset (< Index) THEN
if o.Index > 0 && o.Index <= t.Sequence {
var n int
log.Debugf("subscriber wants to start from %d", o.Index)
q.ForEach(func(item interface{}) error {
err := q.ForEach(func(item interface{}) error {
if msg, ok := item.(Message); ok && msg.ID >= o.Index {
log.Debugf("found msg #%d", msg.ID)
ch <- msg
n++
log.Debugf("found msg %s#%d", msg.Topic.Name, msg.ID)
select {
case ch <- msg:
n++
default:
mb.metrics.Counter("bus", "dropped").Inc()
return ErrBufferFull
}
}
return nil
})
if err != nil {
log.WithError(err).Error("error publishing messages to new subscriber")
}
log.Debugf("published %d messages", n)
} else {
// ELSE start from the beginning (invalid Index or Topic was reset)
// NB: This should not happen with write-ahead-logs (WAL)
var n int
log.Debugf("subscriber wanted to start from invalid %d (topic is at %d)", o.Index, t.Sequence)
q.ForEach(func(item interface{}) error {
err := q.ForEach(func(item interface{}) error {
if msg, ok := item.(Message); ok {
ch <- msg
n++
select {
case ch <- msg:
n++
default:
mb.metrics.Counter("bus", "dropped").Inc()
return ErrBufferFull
}
}
return nil
})
if err != nil {
log.WithError(err).Error("error publishing messages to new subscriber")
}
log.Debugf("published %d messages", n)
}

@ -146,6 +146,37 @@ func TestMessageBusSubscribe(t *testing.T) {
assert.Equal(expected, actual)
}
func TestMessageBusSubscribeFullBuffer(t *testing.T) {
require := require.New(t)
testdir, err := ioutil.TempDir("", "msgbus-logs-*")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir), WithBufferLength(2))
require.NoError(err)
topic := mb.NewTopic("hello")
mb.Put(mb.NewMessage(topic, []byte("foo"))) // ID == 1
mb.Put(mb.NewMessage(topic, []byte("bar"))) // ID == 2
mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 3
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
msgs := mb.Subscribe("id1", "hello")
go func(ctx context.Context) {
select {
case <-msgs:
case <-ctx.Done():
return
}
}(ctx)
//err = mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 3
//require.NoError(err)
}
func TestMessageBusSubscribeWithIndex(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
@ -169,14 +200,14 @@ func TestMessageBusSubscribeWithIndex(t *testing.T) {
mb.Unsubscribe("id1", "foo")
mb.Put(mb.NewMessage(topic, []byte("bar"))) // ID == 2
mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 3
require.NoError(mb.Put(mb.NewMessage(topic, []byte("bar")))) // ID == 2
require.NoError(mb.Put(mb.NewMessage(topic, []byte("baz")))) // ID == 3
msgs = mb.Subscribe("id1", "foo", WithIndex(1))
assert.Equal([]byte("foo"), (<-msgs).Payload)
assert.Equal([]byte("bar"), (<-msgs).Payload)
assert.Equal([]byte("baz"), (<-msgs).Payload)
assert.Equal("foo", string((<-msgs).Payload))
assert.Equal("bar", string((<-msgs).Payload))
assert.Equal("baz", string((<-msgs).Payload))
}
func TestMessageBusWAL(t *testing.T) {

@ -33,15 +33,17 @@ const (
DefaultNoSync = false
)
var DefaultOptions = &Options{
LogPath: DefaultLogPath,
func NewDefaultOptions() *Options {
return &Options{
LogPath: DefaultLogPath,
BufferLength: DefaultBufferLength,
MaxQueueSize: DefaultMaxQueueSize,
MaxPayloadSize: DefaultMaxPayloadSize,
BufferLength: DefaultBufferLength,
MaxQueueSize: DefaultMaxQueueSize,
MaxPayloadSize: DefaultMaxPayloadSize,
Metrics: DefaultMetrics,
NoSync: DefaultNoSync,
Metrics: DefaultMetrics,
NoSync: DefaultNoSync,
}
}
// Options ...

31
tools/test_mkdirtemp.go Normal file

@ -0,0 +1,31 @@
package main
import (
"fmt"
"log"
"os"
"path/filepath"
)
func main() {
logsDir, err := os.MkdirTemp("", "*-logs")
if err != nil {
log.Fatal(err)
}
defer os.RemoveAll(logsDir) // clean up
fmt.Printf("logsDir: %s", logsDir)
// Logs can be cleaned out earlier if needed by searching
// for all directories whose suffix ends in *-logs.
globPattern := filepath.Join(os.TempDir(), "*-logs")
matches, err := filepath.Glob(globPattern)
if err != nil {
log.Fatalf("Failed to match %q: %v", globPattern, err)
}
for _, match := range matches {
if err := os.RemoveAll(match); err != nil {
log.Printf("Failed to remove %q: %v", match, err)
}
}
}

BIN
tools/tools Executable file

Бінарний файл не відображається.