Fix the logic bug in ForEach finally
This commit is contained in:
父節點
021e7011ae
當前提交
0edef43088
24
msgbus.go
24
msgbus.go
|
@ -80,7 +80,7 @@ func LoadMessage(data []byte) (m Message, err error) {
|
|||
}
|
||||
|
||||
func (m Message) String() string {
|
||||
return fmt.Sprintf("msg#%d@%d(%q)", m.ID, m.Created.Unix(), string(m.Payload))
|
||||
return fmt.Sprintf("msg#%d[%s]@%d(%q)", m.ID, m.Topic, m.Created.Unix(), string(m.Payload))
|
||||
}
|
||||
|
||||
func (m Message) Bytes() ([]byte, error) {
|
||||
|
@ -598,16 +598,20 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
|
|||
var n int
|
||||
log.Debugf("subscriber wants to start from %d", o.Index)
|
||||
err := q.ForEach(func(item interface{}) error {
|
||||
if msg, ok := item.(Message); ok && msg.ID >= o.Index {
|
||||
log.Debugf("found msg %s#%d", msg.Topic.Name, msg.ID)
|
||||
select {
|
||||
case ch <- msg:
|
||||
n++
|
||||
default:
|
||||
if mb.metrics != nil {
|
||||
mb.metrics.Counter("bus", "dropped").Inc()
|
||||
if msg, ok := item.(Message); ok {
|
||||
log.Debugf("found msg %s", msg)
|
||||
if msg.ID >= o.Index {
|
||||
select {
|
||||
case ch <- msg:
|
||||
n++
|
||||
default:
|
||||
if mb.metrics != nil {
|
||||
mb.metrics.Counter("bus", "dropped").Inc()
|
||||
}
|
||||
return ErrBufferFull
|
||||
}
|
||||
return ErrBufferFull
|
||||
} else {
|
||||
log.Debugf("msg %s before requested index %d", msg, o.Index)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -248,9 +248,9 @@ func TestMessageBusWAL(t *testing.T) {
|
|||
topic = mb.NewTopic("hello")
|
||||
assert.Equal(int64(3), topic.Sequence)
|
||||
|
||||
assert.Equal([]byte("foo"), (<-msgs).Payload)
|
||||
assert.Equal([]byte("bar"), (<-msgs).Payload)
|
||||
assert.Equal([]byte("baz"), (<-msgs).Payload)
|
||||
assert.Equal("foo", string((<-msgs).Payload))
|
||||
assert.Equal("bar", string((<-msgs).Payload))
|
||||
assert.Equal("baz", string((<-msgs).Payload))
|
||||
|
||||
msg := mb.NewMessage(topic, []byte("foobar"))
|
||||
assert.Equal(int64(4), msg.ID)
|
||||
|
|
10
queue.go
10
queue.go
|
@ -129,10 +129,16 @@ func (q *Queue) ForEach(f func(elem interface{}) error) error {
|
|||
fmt.Printf("tail: %d\n", q.tail)
|
||||
fmt.Printf("count: %d\n", q.count)
|
||||
|
||||
for i := 0; i < (q.count - 1); i++ {
|
||||
if err := f(q.buf[q.next(i)]); err != nil {
|
||||
for i, n := 0, q.tail; ; i, n = i+1, q.next(n) {
|
||||
//for n := q.tail; q.next(n) != q.tail; n = q.next(n) {
|
||||
fmt.Printf("n: %d\n", n)
|
||||
if err := f(q.buf[n]); err != nil {
|
||||
return err
|
||||
}
|
||||
if q.next(n) == q.tail {
|
||||
//if i > 0 && (n == q.head || (q.head == q.tail && q.next(n) == q.head)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -30,6 +30,24 @@ func TestSimple(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSimpleOverflow(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
q := NewQueue(2)
|
||||
|
||||
q.Push(1) // dropped
|
||||
q.Push(2) // dropped
|
||||
q.Push(3)
|
||||
q.Push(4)
|
||||
|
||||
assert.Equal(4, q.Len())
|
||||
|
||||
assert.Equal(q.Peek(), 3)
|
||||
assert.Equal(q.Pop(), 3)
|
||||
assert.Equal(q.Peek(), 4)
|
||||
assert.Equal(q.Pop(), 4)
|
||||
}
|
||||
|
||||
func TestForEach(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
@ -40,7 +58,7 @@ func TestForEach(t *testing.T) {
|
|||
for _, y := range ys {
|
||||
q.Push(y)
|
||||
}
|
||||
assert.Equal(4, q.Len())
|
||||
assert.Equal(len(ys), q.Len())
|
||||
|
||||
var xs []int
|
||||
err := q.ForEach(func(e interface{}) error {
|
||||
|
|
載入中…
新增問題並參考