Fix case for invalid Index to handle Topic resets (msgbus restarts or crashes)

This commit is contained in:
James Mills 2022-04-03 08:16:25 +10:00
parent 14b5443f50
commit ff6225ca00
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6

View File

@ -480,7 +480,9 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
return ch
}
if o.Index >= 0 && o.Index <= q.Len() {
// IF client requests to start from index >= 0 (-1 from the head)
// AND the topic's sequence number hasn't been reset (< Index) THEN
if o.Index >= 0 && o.Index <= t.Sequence {
var n int
log.Debugf("subscriber wants to start from %d", o.Index)
q.ForEach(func(item interface{}) error {
@ -492,6 +494,18 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
return nil
})
log.Debugf("published %d messages", n)
} else {
// ELSE start from the beginning (invalid Index or Topic was reset)
var n int
log.Debugf("subscriber wanted to start from invalid %d (topic is at %d)", o.Index, t.Sequence)
q.ForEach(func(item interface{}) error {
if msg, ok := item.(Message); ok {
ch <- msg
n++
}
return nil
})
log.Debugf("published %d messages", n)
}
return ch