Fix off-by-one error in Queue.ForEach() (#41)
Fixes #40 and salty.im#24 🤞 Co-authored-by: James Mills <prologic@shortcircuit.net.au> Reviewed-on: https://git.mills.io/prologic/msgbus/pulls/41 Reviewed-by: xuu <xuu@noreply@mills.io>
This commit is contained in:
parent
f7bc71d0d0
commit
14a6294f27
26
msgbus.go
26
msgbus.go
|
@ -79,6 +79,10 @@ func LoadMessage(data []byte) (m Message, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (m Message) String() string {
|
||||
return fmt.Sprintf("msg#%d[%s]@%d(%q)", m.ID, m.Topic, m.Created.Unix(), string(m.Payload))
|
||||
}
|
||||
|
||||
func (m Message) Bytes() ([]byte, error) {
|
||||
return msgpack.Marshal(m)
|
||||
}
|
||||
|
@ -594,16 +598,20 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
|
|||
var n int
|
||||
log.Debugf("subscriber wants to start from %d", o.Index)
|
||||
err := q.ForEach(func(item interface{}) error {
|
||||
if msg, ok := item.(Message); ok && msg.ID >= o.Index {
|
||||
log.Debugf("found msg %s#%d", msg.Topic.Name, msg.ID)
|
||||
select {
|
||||
case ch <- msg:
|
||||
n++
|
||||
default:
|
||||
if mb.metrics != nil {
|
||||
mb.metrics.Counter("bus", "dropped").Inc()
|
||||
if msg, ok := item.(Message); ok {
|
||||
log.Debugf("found msg %s", msg)
|
||||
if msg.ID >= o.Index {
|
||||
select {
|
||||
case ch <- msg:
|
||||
n++
|
||||
default:
|
||||
if mb.metrics != nil {
|
||||
mb.metrics.Counter("bus", "dropped").Inc()
|
||||
}
|
||||
return ErrBufferFull
|
||||
}
|
||||
return ErrBufferFull
|
||||
} else {
|
||||
log.Debugf("msg %s before requested index %d", msg, o.Index)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -248,9 +248,9 @@ func TestMessageBusWAL(t *testing.T) {
|
|||
topic = mb.NewTopic("hello")
|
||||
assert.Equal(int64(3), topic.Sequence)
|
||||
|
||||
assert.Equal([]byte("foo"), (<-msgs).Payload)
|
||||
assert.Equal([]byte("bar"), (<-msgs).Payload)
|
||||
assert.Equal([]byte("baz"), (<-msgs).Payload)
|
||||
assert.Equal("foo", string((<-msgs).Payload))
|
||||
assert.Equal("bar", string((<-msgs).Payload))
|
||||
assert.Equal("baz", string((<-msgs).Payload))
|
||||
|
||||
msg := mb.NewMessage(topic, []byte("foobar"))
|
||||
assert.Equal(int64(4), msg.ID)
|
||||
|
|
19
queue.go
19
queue.go
|
@ -1,6 +1,8 @@
|
|||
package msgbus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
sync "github.com/sasha-s/go-deadlock"
|
||||
)
|
||||
|
||||
|
@ -113,7 +115,7 @@ func (q *Queue) Peek() interface{} {
|
|||
}
|
||||
|
||||
// ForEach applys the function `f` over each item in the queue for read-only
|
||||
// access into the queue in O(n) time for indexining into the queue.
|
||||
// access into the queue in O(n) time.
|
||||
func (q *Queue) ForEach(f func(elem interface{}) error) error {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
@ -122,10 +124,21 @@ func (q *Queue) ForEach(f func(elem interface{}) error) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
for i := 0; i < q.count; i++ {
|
||||
if err := f(q.buf[i]); err != nil {
|
||||
fmt.Printf("buf: #%v\n", q.buf)
|
||||
fmt.Printf("head: %d\n", q.head)
|
||||
fmt.Printf("tail: %d\n", q.tail)
|
||||
fmt.Printf("count: %d\n", q.count)
|
||||
|
||||
for i, n := 0, q.tail; ; i, n = i+1, q.next(n) {
|
||||
//for n := q.tail; q.next(n) != q.tail; n = q.next(n) {
|
||||
fmt.Printf("n: %d\n", n)
|
||||
if err := f(q.buf[n]); err != nil {
|
||||
return err
|
||||
}
|
||||
if q.next(n) == q.tail {
|
||||
//if i > 0 && (n == q.head || (q.head == q.tail && q.next(n) == q.head)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -30,6 +30,24 @@ func TestSimple(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSimpleOverflow(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
q := NewQueue(2)
|
||||
|
||||
q.Push(1) // dropped
|
||||
q.Push(2) // dropped
|
||||
q.Push(3)
|
||||
q.Push(4)
|
||||
|
||||
assert.Equal(4, q.Len())
|
||||
|
||||
assert.Equal(q.Peek(), 3)
|
||||
assert.Equal(q.Pop(), 3)
|
||||
assert.Equal(q.Peek(), 4)
|
||||
assert.Equal(q.Pop(), 4)
|
||||
}
|
||||
|
||||
func TestForEach(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
@ -40,7 +58,7 @@ func TestForEach(t *testing.T) {
|
|||
for _, y := range ys {
|
||||
q.Push(y)
|
||||
}
|
||||
assert.Equal(4, q.Len())
|
||||
assert.Equal(len(ys), q.Len())
|
||||
|
||||
var xs []int
|
||||
err := q.ForEach(func(e interface{}) error {
|
||||
|
@ -55,6 +73,39 @@ func TestForEach(t *testing.T) {
|
|||
assert.Equal(ys, xs)
|
||||
}
|
||||
|
||||
func TestForEachOverflow(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
||||
maxlen := 4
|
||||
q := &Queue{maxlen: maxlen}
|
||||
|
||||
N := (maxlen + 1)
|
||||
var ys []int
|
||||
for i := 0; i < N; i++ {
|
||||
q.Push(i)
|
||||
ys = append(ys, i)
|
||||
t.Logf("pushed %d", i)
|
||||
t.Logf("q.Len() %d", q.Len())
|
||||
t.Logf("q.Size() %d", q.Size())
|
||||
}
|
||||
assert.False(q.Full())
|
||||
assert.Equal(N, q.Len())
|
||||
|
||||
var xs []int
|
||||
err := q.ForEach(func(e interface{}) error {
|
||||
i, ok := e.(int)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected type %T", e)
|
||||
}
|
||||
xs = append(xs, i)
|
||||
return nil
|
||||
})
|
||||
require.NoError(err)
|
||||
t.Logf("#%v == #%v\n", ys[(N-maxlen):], xs)
|
||||
assert.Equal(ys[(N-maxlen):], xs)
|
||||
}
|
||||
|
||||
func TestMaxLen(t *testing.T) {
|
||||
q := Queue{maxlen: minCapacity}
|
||||
assert.Equal(t, q.MaxLen(), minCapacity)
|
||||
|
|
Loading…
Reference in New Issue