Implement the other sdie

This commit is contained in:
James Mills 2022-04-03 16:46:10 +10:00
parent 55f774447d
commit de4e7a1c4c
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6

View File

@ -6,8 +6,11 @@ import (
"encoding/json"
"fmt"
"io"
"io/fs"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
@ -307,7 +310,7 @@ func NewMessageBus(opts ...Option) (*MessageBus, error) {
)
}
return &MessageBus{
mb := &MessageBus{
options: options,
metrics: metrics,
@ -316,7 +319,74 @@ func NewMessageBus(opts ...Option) (*MessageBus, error) {
logs: make(map[*Topic]*wal.Log),
subscribers: make(map[*Topic]*Subscribers),
}, nil
}
if err := mb.readLogs(); err != nil {
return nil, fmt.Errorf("error reading logs: %w", err)
}
return mb, nil
}
func (mb *MessageBus) readLog(f fs.FileInfo) error {
l, err := wal.Open(f.Name(), nil)
if err != nil {
return fmt.Errorf("error opening log %s: %w", f, err)
}
t := mb.NewTopic(filepath.Base(f.Name()))
t.Created = f.ModTime()
index, err := l.LastIndex()
if err != nil {
return fmt.Errorf("error reading last index: %w", err)
}
t.Sequence = int64(index)
q := NewQueue(mb.options.MaxQueueSize)
start := (index - uint64(mb.options.MaxQueueSize))
end := index
for i := start; i <= end; i++ {
data, err := l.Read(i)
if err != nil {
return fmt.Errorf("error reading log %d: %w", i, err)
}
msg, err := LoadMessage(data)
if err != nil {
return fmt.Errorf("error deserialing log %d: %w", i, err)
}
q.Push(msg)
}
mb.queues[t] = q
return nil
}
func (mb *MessageBus) readLogs() error {
mb.Lock()
defer mb.Unlock()
dirs, err := os.ReadDir(mb.options.LogPath)
if err != nil {
return fmt.Errorf("error listing logs: %w", err)
}
for _, dir := range dirs {
if dir.IsDir() {
info, err := dir.Info()
if err != nil {
return fmt.Errorf("error reading log path %s: %w", dir, err)
}
if err := mb.readLog(info); err != nil {
return err
}
}
}
return nil
}
// Len ...