Fix tests

This commit is contained in:
James Mills 2022-04-03 17:09:34 +10:00
parent e2e95bebac
commit 5b4cc735ec
No known key found for this signature in database
GPG Key ID: AC4C014F1440EBD6
2 changed files with 118 additions and 36 deletions

View File

@ -433,7 +433,7 @@ func (mb *MessageBus) NewMessage(topic *Topic, payload []byte) Message {
}()
return Message{
ID: topic.Sequence,
ID: topic.Sequence + 1,
Topic: topic,
Payload: payload,
Created: time.Now(),
@ -569,7 +569,7 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
// IF client requests to start from index >= 0 (-1 from the head)
// AND the topic's sequence number hasn't been reset (< Index) THEN
if o.Index >= 0 && o.Index <= t.Sequence {
if o.Index > 0 && o.Index <= t.Sequence {
var n int
log.Debugf("subscriber wants to start from %d", o.Index)
q.ForEach(func(item interface{}) error {
@ -583,6 +583,7 @@ func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan
log.Debugf("published %d messages", n)
} else {
// ELSE start from the beginning (invalid Index or Topic was reset)
// NB: This should not happen with write-ahead-logs (WAL)
var n int
log.Debugf("subscriber wanted to start from invalid %d (topic is at %d)", o.Index, t.Sequence)
q.ForEach(func(item interface{}) error {

View File

@ -36,14 +36,19 @@ func TestMessage(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
assert.Equal(0, mb.Len())
topic := mb.NewTopic("foo")
expected := mb.NewMessage(topic, []byte("bar"))
mb.Put(expected)
err = mb.Put(expected)
require.NoError(err)
actual, ok := mb.Get(topic)
require.True(ok)
@ -54,7 +59,11 @@ func TestMessageIds(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
assert.Equal(0, mb.Len())
@ -70,15 +79,18 @@ func TestMessageIds(t *testing.T) {
mb.Put(mb.NewMessage(topic, []byte("bar")))
msg, ok := mb.Get(topic)
require.True(ok)
assert.Equal(msg.ID, 1)
assert.Equal(msg.ID, int64(2))
}
func TestMessageGetEmpty(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
assert.Equal(0, mb.Len())
@ -93,7 +105,11 @@ func TestMessageBusPutGet(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
topic := mb.NewTopic("foo")
@ -109,7 +125,11 @@ func TestMessageBusSubscribe(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
msgs := mb.Subscribe("id1", "foo")
@ -126,26 +146,31 @@ func TestMessageBusSubscribeWithIndex(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
msgs := mb.Subscribe("id1", "foo")
topic := mb.NewTopic("foo")
expected := mb.NewMessage(topic, []byte("foo"))
expected := mb.NewMessage(topic, []byte("foo")) // ID == 1
mb.Put(expected)
actual := <-msgs
assert.Equal(expected, actual)
assert.Equal(0, actual.ID)
assert.Equal(int64(1), actual.ID)
mb.Unsubscribe("id1", "foo")
mb.Put(mb.NewMessage(topic, []byte("bar"))) // ID == 1
mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 2
mb.Put(mb.NewMessage(topic, []byte("bar"))) // ID == 2
mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 3
msgs = mb.Subscribe("id1", "foo", WithIndex(1))
assert.Equal([]byte("foo"), (<-msgs).Payload)
assert.Equal([]byte("bar"), (<-msgs).Payload)
assert.Equal([]byte("baz"), (<-msgs).Payload)
}
@ -164,15 +189,15 @@ func TestMessageBusWAL(t *testing.T) {
msgs := mb.Subscribe("id1", "hello")
topic := mb.NewTopic("hello")
mb.Put(mb.NewMessage(topic, []byte("foo"))) // ID == 0
mb.Put(mb.NewMessage(topic, []byte("bar"))) // ID == 1
mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 2
mb.Put(mb.NewMessage(topic, []byte("foo"))) // ID == 1
mb.Put(mb.NewMessage(topic, []byte("bar"))) // ID == 2
mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 3
assert.Equal([]byte("foo"), (<-msgs).Payload)
assert.Equal([]byte("bar"), (<-msgs).Payload)
assert.Equal([]byte("baz"), (<-msgs).Payload)
assert.Equal(3, topic.Sequence)
assert.Equal(int64(3), topic.Sequence)
mb.Unsubscribe("id1", "foo")
@ -183,24 +208,28 @@ func TestMessageBusWAL(t *testing.T) {
require.NoError(err)
// we have to tell the bus we want to subscribe from the start
msgs = mb.Subscribe("id1", "hello", WithIndex(0))
msgs = mb.Subscribe("id1", "hello", WithIndex(1))
topic = mb.NewTopic("hello")
assert.Equal(3, topic.Sequence)
assert.Equal(int64(3), topic.Sequence)
assert.Equal([]byte("foo"), (<-msgs).Payload)
assert.Equal([]byte("bar"), (<-msgs).Payload)
assert.Equal([]byte("baz"), (<-msgs).Payload)
msg := mb.NewMessage(topic, []byte("foobar"))
assert.Equal(4, msg.ID)
assert.Equal(int64(4), msg.ID)
}
func TestServeHTTPGETIndexEmpty(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus(nil)
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
w := httptest.NewRecorder()
@ -215,7 +244,11 @@ func TestServeHTTPGETTopics(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
mb.Put(mb.NewMessage(mb.NewTopic("foo"), []byte("foo")))
@ -234,7 +267,11 @@ func TestServeHTTPGETEmptyQueue(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
w := httptest.NewRecorder()
@ -248,7 +285,11 @@ func TestServeHTTPPOST(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
w := httptest.NewRecorder()
@ -263,7 +304,11 @@ func TestServeHTTPMaxPayloadSize(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
w := httptest.NewRecorder()
@ -279,7 +324,11 @@ func TestServeHTTPSimple(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
w := httptest.NewRecorder()
@ -305,7 +354,11 @@ func TestServeHTTPSimple(t *testing.T) {
func BenchmarkServeHTTPPOST(b *testing.B) {
require := require.New(b)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
b.ResetTimer()
@ -322,7 +375,11 @@ func TestServeHTTPSubscriber(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
s := httptest.NewServer(mb)
@ -367,7 +424,11 @@ func TestServeHTTPSubscriberReconnect(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
s := httptest.NewServer(mb)
@ -414,7 +475,11 @@ func TestMsgBusMetrics(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb, err := NewMessageBus(WithMetrics(true))
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
assert.IsType(&Metrics{}, mb.Metrics())
@ -423,7 +488,11 @@ func TestMsgBusMetrics(t *testing.T) {
func BenchmarkMessageBusPut(b *testing.B) {
require := require.New(b)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
topic := mb.NewTopic("foo")
@ -437,7 +506,11 @@ func BenchmarkMessageBusPut(b *testing.B) {
func BenchmarkMessageBusGet(b *testing.B) {
require := require.New(b)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
topic := mb.NewTopic("foo")
@ -454,7 +527,11 @@ func BenchmarkMessageBusGet(b *testing.B) {
func BenchmarkMessageBusGetEmpty(b *testing.B) {
require := require.New(b)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
topic := mb.NewTopic("foo")
@ -467,7 +544,11 @@ func BenchmarkMessageBusGetEmpty(b *testing.B) {
func BenchmarkMessageBusPutGet(b *testing.B) {
require := require.New(b)
mb, err := NewMessageBus()
testdir, err := ioutil.TempDir("", "msgbus-logs")
require.NoError(err)
defer os.RemoveAll(testdir)
mb, err := NewMessageBus(WithLogPath(testdir))
require.NoError(err)
topic := mb.NewTopic("foo")