Fix a bug in client not tracking topic index when using indexing

This commit is contained in:
James Mills 2022-04-04 12:34:51 +10:00
parent 9e489aadfc
commit 44badd9730
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6
2 changed files with 37 additions and 48 deletions

View File

@ -156,7 +156,6 @@ type Subscriber struct {
handler msgbus.HandlerFunc
url string
reconnectInterval time.Duration
maxReconnectInterval time.Duration
}
@ -167,36 +166,48 @@ func NewSubscriber(client *Client, topic string, index int64, handler msgbus.Han
handler = noopHandler
}
u, err := url.Parse(client.url)
if err != nil {
log.Fatalf("invalid url: %s", client.url)
}
if strings.HasPrefix(client.url, "https") {
u.Scheme = "wss"
} else {
u.Scheme = "ws"
}
u.Path += fmt.Sprintf("/%s", topic)
q := u.Query()
q.Set("index", strconv.FormatInt(index, 10))
u.RawQuery = q.Encode()
url := u.String()
return &Subscriber{
client: client,
topic: topic,
index: index,
handler: handler,
url: url,
reconnectInterval: client.reconnectInterval,
maxReconnectInterval: client.maxReconnectInterval,
}
}
func (s *Subscriber) url() string {
u, err := url.Parse(s.client.url)
if err != nil {
log.Fatalf("invalid url: %s", s.client.url)
}
if strings.HasPrefix(s.client.url, "https") {
u.Scheme = "wss"
} else {
u.Scheme = "ws"
}
u.Path += fmt.Sprintf("/%s", s.topic)
q := u.Query()
q.Set("index", strconv.FormatInt(s.index, 10))
u.RawQuery = q.Encode()
return u.String()
}
func (s *Subscriber) maybeUpdateIndex(msg *msgbus.Message) {
s.Lock()
defer s.Unlock()
if s.index > 0 {
log.Debugf("updating index from %d to %d", s.index, (msg.ID + 1))
// NB: We update to index +1 so we don't keep getting the previous message(
s.index = msg.ID + 1
}
}
func (s *Subscriber) closeAndReconnect() {
s.RLock()
if s.conn != nil {
@ -219,7 +230,7 @@ func (s *Subscriber) connect() {
ctx := context.Background()
for {
conn, _, err := websocket.Dial(ctx, s.url, nil)
conn, _, err := websocket.Dial(ctx, s.url(), nil)
if err != nil {
time.Sleep(b.Duration())
continue
@ -245,6 +256,7 @@ func (s *Subscriber) readLoop(ctx context.Context) {
s.closeAndReconnect()
return
}
s.maybeUpdateIndex(msg)
if err := s.handler(msg); err != nil {
log.Warnf("error handling message: %s", err)

View File

@ -588,12 +588,6 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
return ch
}
if o.Index == 0 {
// Index == 0 indicates to start from head
// so just return the channel
return ch
}
// IF client requests to start from index >= 0 (-1 from the head)
// AND the topic's sequence number hasn't been reset (< Index) THEN
if o.Index > 0 && o.Index <= t.Sequence {
@ -616,29 +610,12 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
log.WithError(err).Error("error publishing messages to new subscriber")
}
log.Debugf("published %d messages", n)
} else {
// ELSE start from the beginning (invalid Index or Topic was reset)
// NB: This should not happen with write-ahead-logs (WAL)
var n int
log.Debugf("subscriber wanted to start from invalid %d (topic is at %d)", o.Index, t.Sequence)
err := q.ForEach(func(item interface{}) error {
if msg, ok := item.(Message); ok {
select {
case ch <- msg:
n++
default:
mb.metrics.Counter("bus", "dropped").Inc()
return ErrBufferFull
}
}
return nil
})
if err != nil {
log.WithError(err).Error("error publishing messages to new subscriber")
}
log.Debugf("published %d messages", n)
return ch
}
// Otherwise, s.Index was eitehr 0 (start from head)
// OR > topic.Sequence in which case (start from head)
return ch
}