6
0
mirror of https://git.mills.io/prologic/msgbus.git synced 2024-06-25 00:09:08 +00:00
prologic-msgbus/msgbus_test.go

315 lines
6.4 KiB
Go
Raw Normal View History

2017-06-03 15:16:17 +00:00
package msgbus
import (
"bytes"
2022-03-27 04:47:14 +00:00
"context"
"encoding/json"
"net/http"
"net/http/httptest"
2017-06-03 15:16:17 +00:00
"testing"
"github.com/stretchr/testify/assert"
2022-04-02 04:16:49 +00:00
"github.com/stretchr/testify/require"
2022-03-27 04:47:14 +00:00
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
2017-06-03 15:16:17 +00:00
)
func TestMessageBusLen(t *testing.T) {
mb := New(nil)
2017-06-03 15:16:17 +00:00
assert.Equal(t, mb.Len(), 0)
}
func TestMessage(t *testing.T) {
mb := New(nil)
2017-06-03 15:16:17 +00:00
assert.Equal(t, mb.Len(), 0)
2017-08-20 01:26:15 +00:00
topic := mb.NewTopic("foo")
2017-08-10 07:30:45 +00:00
expected := Message{Topic: topic, Payload: []byte("bar")}
2017-08-20 01:26:15 +00:00
mb.Put(expected)
2017-06-03 15:16:17 +00:00
actual, ok := mb.Get(topic)
assert.True(t, ok)
assert.Equal(t, actual, expected)
}
func TestMessageGetEmpty(t *testing.T) {
mb := New(nil)
2017-06-03 15:16:17 +00:00
assert.Equal(t, mb.Len(), 0)
2017-08-20 01:26:15 +00:00
topic := mb.NewTopic("foo")
2017-06-03 15:16:17 +00:00
msg, ok := mb.Get(topic)
assert.False(t, ok)
2017-08-10 07:30:45 +00:00
assert.Equal(t, msg, Message{})
2017-06-03 15:16:17 +00:00
}
2018-04-07 06:34:28 +00:00
func TestMessageBusPutGet(t *testing.T) {
mb := New(nil)
2018-04-07 06:34:28 +00:00
topic := mb.NewTopic("foo")
expected := Message{Topic: topic, Payload: []byte("foo")}
mb.Put(expected)
actual, ok := mb.Get(topic)
assert.True(t, ok)
assert.Equal(t, actual, expected)
}
2022-04-02 04:16:49 +00:00
func TestMessageBusSubscribe(t *testing.T) {
mb := New(nil)
msgs := mb.Subscribe("id1", "foo")
topic := mb.NewTopic("foo")
expected := Message{Topic: topic, Payload: []byte("foo")}
mb.Put(expected)
actual := <-msgs
assert.Equal(t, actual, expected)
}
func TestServeHTTPGETIndexEmpty(t *testing.T) {
assert := assert.New(t)
mb := New(nil)
w := httptest.NewRecorder()
r, _ := http.NewRequest("GET", "/", nil)
mb.ServeHTTP(w, r)
assert.Equal(w.Code, http.StatusOK)
assert.Equal(w.Body.String(), "{}")
}
2018-05-14 10:12:07 +00:00
func TestServeHTTPGETTopics(t *testing.T) {
assert := assert.New(t)
mb := New(nil)
mb.Put(Message{Topic: mb.NewTopic("foo"), Payload: []byte("foo")})
mb.Put(Message{Topic: mb.NewTopic("hello"), Payload: []byte("hello world")})
w := httptest.NewRecorder()
r, _ := http.NewRequest("GET", "/", nil)
mb.ServeHTTP(w, r)
assert.Equal(w.Code, http.StatusOK)
2018-05-14 10:25:00 +00:00
assert.Contains(w.Body.String(), "foo")
assert.Contains(w.Body.String(), "hello")
2018-05-14 10:12:07 +00:00
}
func TestServeHTTPGETEmptyQueue(t *testing.T) {
assert := assert.New(t)
mb := New(nil)
w := httptest.NewRecorder()
r, _ := http.NewRequest("GET", "/hello", nil)
mb.ServeHTTP(w, r)
assert.Equal(w.Code, http.StatusNoContent)
2018-05-14 10:12:07 +00:00
}
func TestServeHTTPPOST(t *testing.T) {
assert := assert.New(t)
mb := New(nil)
w := httptest.NewRecorder()
b := bytes.NewBufferString("hello world")
r, _ := http.NewRequest("POST", "/hello", b)
mb.ServeHTTP(w, r)
2018-05-14 10:04:45 +00:00
assert.Equal(w.Code, http.StatusAccepted)
}
2018-05-11 07:41:52 +00:00
func TestServeHTTPMaxPayloadSize(t *testing.T) {
assert := assert.New(t)
mb := New(nil)
w := httptest.NewRecorder()
b := bytes.NewBuffer(bytes.Repeat([]byte{'X'}, (DefaultMaxPayloadSize * 2)))
r, _ := http.NewRequest("POST", "/hello", b)
mb.ServeHTTP(w, r)
assert.Equal(http.StatusRequestEntityTooLarge, w.Code)
assert.Regexp(`payload exceeds max-payload-size`, w.Body.String())
}
func TestServeHTTPSimple(t *testing.T) {
assert := assert.New(t)
mb := New(nil)
w := httptest.NewRecorder()
b := bytes.NewBufferString("hello world")
r, _ := http.NewRequest("POST", "/hello", b)
mb.ServeHTTP(w, r)
2018-05-14 10:04:45 +00:00
assert.Equal(w.Code, http.StatusAccepted)
w = httptest.NewRecorder()
r, _ = http.NewRequest("GET", "/hello", nil)
mb.ServeHTTP(w, r)
assert.Equal(w.Code, http.StatusOK)
var msg *Message
json.Unmarshal(w.Body.Bytes(), &msg)
assert.Equal(msg.ID, uint64(0))
assert.Equal(msg.Topic.Name, "hello")
assert.Equal(msg.Payload, []byte("hello world"))
}
func BenchmarkServeHTTPPOST(b *testing.B) {
mb := New(nil)
b.ResetTimer()
for i := 0; i < b.N; i++ {
w := httptest.NewRecorder()
b := bytes.NewBufferString("hello world")
r, _ := http.NewRequest("POST", "/hello", b)
mb.ServeHTTP(w, r)
}
}
func TestServeHTTPSubscriber(t *testing.T) {
assert := assert.New(t)
2022-04-02 04:16:49 +00:00
require := require.New(t)
mb := New(nil)
s := httptest.NewServer(mb)
defer s.Close()
msgs := make(chan *Message)
ready := make(chan bool, 1)
consumer := func() {
var msg *Message
2022-03-27 04:47:14 +00:00
// u := fmt.Sprintf("ws%s/hello", strings.TrimPrefix(s.URL, "http"))
ws, _, err := websocket.Dial(context.Background(), s.URL+"/hello", nil)
2022-04-02 04:16:49 +00:00
require.NoError(err)
defer ws.Close(websocket.StatusNormalClosure, "")
ready <- true
err = wsjson.Read(context.Background(), ws, &msg)
require.NoError(err)
msgs <- msg
}
go consumer()
<-ready
c := s.Client()
b := bytes.NewBufferString("hello world")
r, err := c.Post(s.URL+"/hello", "text/plain", b)
require.NoError(err)
defer r.Body.Close()
msg := <-msgs
assert.Equal(msg.ID, uint64(0))
assert.Equal(msg.Topic.Name, "hello")
assert.Equal(msg.Payload, []byte("hello world"))
}
2022-04-02 04:16:49 +00:00
func TestServeHTTPSubscriberReconnect(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
mb := New(nil)
s := httptest.NewServer(mb)
msgs := make(chan *Message)
ready := make(chan bool, 1)
consumer := func() {
var msg *Message
ws, _, err := websocket.Dial(context.Background(), s.URL+"/hello", nil)
require.NoError(err)
2022-03-27 04:47:14 +00:00
defer ws.Close(websocket.StatusNormalClosure, "")
ready <- true
2022-04-02 04:16:49 +00:00
err = wsjson.Read(context.Background(), ws, &msg)
require.NoError(err)
msgs <- msg
}
go consumer()
<-ready
2022-04-02 04:19:17 +00:00
s.Close()
s = httptest.NewServer(mb)
defer s.Close()
c := s.Client()
b := bytes.NewBufferString("hello world")
r, err := c.Post(s.URL+"/hello", "text/plain", b)
2022-04-02 04:16:49 +00:00
require.NoError(err)
defer r.Body.Close()
msg := <-msgs
assert.Equal(msg.ID, uint64(0))
assert.Equal(msg.Topic.Name, "hello")
assert.Equal(msg.Payload, []byte("hello world"))
}
2018-05-11 07:09:58 +00:00
func TestMsgBusMetrics(t *testing.T) {
assert := assert.New(t)
opts := Options{
WithMetrics: true,
}
mb := New(&opts)
assert.IsType(&Metrics{}, mb.Metrics())
}
2017-06-03 15:16:17 +00:00
func BenchmarkMessageBusPut(b *testing.B) {
mb := New(nil)
2017-08-20 01:26:15 +00:00
topic := mb.NewTopic("foo")
msg := Message{Topic: topic, Payload: []byte("foo")}
2017-06-03 15:16:17 +00:00
b.ResetTimer()
for i := 0; i < b.N; i++ {
2017-08-20 01:26:15 +00:00
mb.Put(msg)
2017-06-03 15:16:17 +00:00
}
}
func BenchmarkMessageBusGet(b *testing.B) {
mb := New(nil)
2017-08-20 01:26:15 +00:00
topic := mb.NewTopic("foo")
2018-03-03 19:42:50 +00:00
msg := Message{Topic: topic, Payload: []byte("foo")}
2017-06-03 15:16:17 +00:00
for i := 0; i < b.N; i++ {
2017-08-20 01:26:15 +00:00
mb.Put(msg)
2017-06-03 15:16:17 +00:00
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
mb.Get(topic)
}
}
func BenchmarkMessageBusGetEmpty(b *testing.B) {
mb := New(nil)
2017-08-20 01:26:15 +00:00
topic := mb.NewTopic("foo")
2017-06-03 15:16:17 +00:00
b.ResetTimer()
for i := 0; i < b.N; i++ {
mb.Get(topic)
}
}
func BenchmarkMessageBusPutGet(b *testing.B) {
mb := New(nil)
2017-08-20 01:26:15 +00:00
topic := mb.NewTopic("foo")
2018-03-03 19:42:50 +00:00
msg := Message{Topic: topic, Payload: []byte("foo")}
2017-06-03 15:16:17 +00:00
b.ResetTimer()
for i := 0; i < b.N; i++ {
2017-08-20 01:26:15 +00:00
mb.Put(msg)
2017-06-03 15:16:17 +00:00
mb.Get(topic)
}
}