Fix case for invalid Index to handle Topic resets (msgbus restarts or crashes) (#30)
Co-authored-by: James Mills <prologic@shortcircuit.net.au> Reviewed-on: https://git.mills.io/prologic/msgbus/pulls/30
This commit is contained in:
parent
14b5443f50
commit
429361c85c
16
msgbus.go
16
msgbus.go
|
@ -480,7 +480,9 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
|
|||
return ch
|
||||
}
|
||||
|
||||
if o.Index >= 0 && o.Index <= q.Len() {
|
||||
// IF client requests to start from index >= 0 (-1 from the head)
|
||||
// AND the topic's sequence number hasn't been reset (< Index) THEN
|
||||
if o.Index >= 0 && o.Index <= t.Sequence {
|
||||
var n int
|
||||
log.Debugf("subscriber wants to start from %d", o.Index)
|
||||
q.ForEach(func(item interface{}) error {
|
||||
|
@ -492,6 +494,18 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
|
|||
return nil
|
||||
})
|
||||
log.Debugf("published %d messages", n)
|
||||
} else {
|
||||
// ELSE start from the beginning (invalid Index or Topic was reset)
|
||||
var n int
|
||||
log.Debugf("subscriber wanted to start from invalid %d (topic is at %d)", o.Index, t.Sequence)
|
||||
q.ForEach(func(item interface{}) error {
|
||||
if msg, ok := item.(Message); ok {
|
||||
ch <- msg
|
||||
n++
|
||||
}
|
||||
return nil
|
||||
})
|
||||
log.Debugf("published %d messages", n)
|
||||
}
|
||||
|
||||
return ch
|
||||
|
|
Loading…
Reference in New Issue