Fix more broken shit

This commit is contained in:
James Mills 2022-04-03 18:17:55 +10:00
parent 5b4cc735ec
commit d1f175699f
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6

View File

@ -66,12 +66,12 @@ type Message struct {
Created time.Time `json:"created"`
}
func LoadMessage(data []byte) (m *Message, err error) {
func LoadMessage(data []byte) (m Message, err error) {
err = json.Unmarshal(data, &m)
return
}
func (m *Message) Bytes() ([]byte, error) {
func (m Message) Bytes() ([]byte, error) {
return json.Marshal(m)
}
@ -331,26 +331,39 @@ func NewMessageBus(opts ...Option) (*MessageBus, error) {
func (mb *MessageBus) readLog(f fs.FileInfo) error {
log.Debugf("reading log %s", f.Name())
l, err := wal.Open(f.Name(), nil)
l, err := wal.Open(filepath.Join(mb.options.LogPath, f.Name()), nil)
if err != nil {
return fmt.Errorf("error opening log %s: %w", f, err)
}
t := mb.newTopic(filepath.Base(f.Name()))
t := mb.newTopic(f.Name())
t.Created = f.ModTime()
index, err := l.LastIndex()
first, err := l.FirstIndex()
if err != nil {
return fmt.Errorf("error reading first index: %w", err)
}
last, err := l.LastIndex()
if err != nil {
return fmt.Errorf("error reading last index: %w", err)
}
t.Sequence = int64(index)
log.Debugf("first index: %d", first)
log.Debugf("last index: %d", last)
t.Sequence = int64(last)
q := NewQueue(mb.options.MaxQueueSize)
start := (index - uint64(mb.options.MaxQueueSize))
end := index
start := int64(last) - int64(mb.options.MaxQueueSize)
if start < 0 {
start = int64(first)
}
end := int64(last)
log.Debugf("start: %d", start)
log.Debugf("end: %d", end)
for i := start; i <= end; i++ {
data, err := l.Read(i)
data, err := l.Read(uint64(i))
if err != nil {
return fmt.Errorf("error reading log %d: %w", i, err)
}