spogulis no https://git.mills.io/prologic/msgbus.git
Add support for subscribers to start from an index
This commit is contained in:
vecāks
4bbe613486
revīzija
abe0d5e972
|
@ -8,10 +8,10 @@ import (
|
|||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jpillora/backoff"
|
||||
sync "github.com/sasha-s/go-deadlock"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"nhooyr.io/websocket"
|
||||
"nhooyr.io/websocket/wsjson"
|
||||
|
@ -138,8 +138,8 @@ func (c *Client) Publish(topic, message string) error {
|
|||
}
|
||||
|
||||
// Subscribe ...
|
||||
func (c *Client) Subscribe(topic string, handler msgbus.HandlerFunc) *Subscriber {
|
||||
return NewSubscriber(c, topic, handler)
|
||||
func (c *Client) Subscribe(topic string, index int, handler msgbus.HandlerFunc) *Subscriber {
|
||||
return NewSubscriber(c, topic, index, handler)
|
||||
}
|
||||
|
||||
// Subscriber ...
|
||||
|
@ -150,7 +150,9 @@ type Subscriber struct {
|
|||
|
||||
client *Client
|
||||
|
||||
topic string
|
||||
topic string
|
||||
index int
|
||||
|
||||
handler msgbus.HandlerFunc
|
||||
|
||||
url string
|
||||
|
@ -159,7 +161,7 @@ type Subscriber struct {
|
|||
}
|
||||
|
||||
// NewSubscriber ...
|
||||
func NewSubscriber(client *Client, topic string, handler msgbus.HandlerFunc) *Subscriber {
|
||||
func NewSubscriber(client *Client, topic string, index int, handler msgbus.HandlerFunc) *Subscriber {
|
||||
if handler == nil {
|
||||
handler = noopHandler
|
||||
}
|
||||
|
@ -175,13 +177,14 @@ func NewSubscriber(client *Client, topic string, handler msgbus.HandlerFunc) *Su
|
|||
u.Scheme = "ws"
|
||||
}
|
||||
|
||||
u.Path += fmt.Sprintf("/%s", topic)
|
||||
u.Path += fmt.Sprintf("/%s?index=%d", topic, index)
|
||||
|
||||
url := u.String()
|
||||
|
||||
return &Subscriber{
|
||||
client: client,
|
||||
topic: topic,
|
||||
index: index,
|
||||
handler: handler,
|
||||
|
||||
url: url,
|
||||
|
|
|
@ -23,13 +23,20 @@ var subCmd = &cobra.Command{
|
|||
Short: "Subscribe to a topic",
|
||||
Long: `This subscribes to the given topic and for every message published
|
||||
to the topic, the message is printed to standard output (default) or the
|
||||
supplied command is executed with the contents of the message as stdin.`,
|
||||
supplied command is executed with the contents of the message as stdin.
|
||||
|
||||
If the -i/--index option is supplied with a valid value (>= 0) then the
|
||||
subscription will start from that position in the topic's sequence
|
||||
(which are monotonic increasing integers). It is the responsibility of
|
||||
the client to keep track of its last index into a topic and indexes
|
||||
reset to zero on message bus restarts.`,
|
||||
Args: cobra.MinimumNArgs(1),
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
uri := viper.GetString("uri")
|
||||
client := client.NewClient(uri, nil)
|
||||
|
||||
topic := args[0]
|
||||
index := viper.GetInt("index")
|
||||
|
||||
var (
|
||||
command string
|
||||
|
@ -40,12 +47,17 @@ supplied command is executed with the contents of the message as stdin.`,
|
|||
args = args[2:]
|
||||
}
|
||||
|
||||
subscribe(client, topic, command, args)
|
||||
subscribe(client, topic, index, command, args)
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.AddCommand(subCmd)
|
||||
|
||||
subCmd.Flags().IntP(
|
||||
"index", "i", -1,
|
||||
"position in the topic's sequence to start subscribing from (-1 indicates end)",
|
||||
)
|
||||
}
|
||||
|
||||
func handler(command string, args []string) msgbus.HandlerFunc {
|
||||
|
@ -86,12 +98,12 @@ func handler(command string, args []string) msgbus.HandlerFunc {
|
|||
}
|
||||
}
|
||||
|
||||
func subscribe(client *client.Client, topic, command string, args []string) {
|
||||
func subscribe(client *client.Client, topic string, index int, command string, args []string) {
|
||||
if topic == "" {
|
||||
topic = defaultTopic
|
||||
}
|
||||
|
||||
s := client.Subscribe(topic, handler(command, args))
|
||||
s := client.Subscribe(topic, index, handler(command, args))
|
||||
s.Start()
|
||||
|
||||
sigs := make(chan os.Signal, 1)
|
||||
|
|
2
go.mod
2
go.mod
|
@ -9,6 +9,7 @@ require (
|
|||
github.com/mitchellh/go-homedir v1.1.0
|
||||
github.com/mmcloughlin/professor v0.0.0-20170922221822-6b97112ab8b3
|
||||
github.com/prometheus/client_golang v1.12.1
|
||||
github.com/sasha-s/go-deadlock v0.3.1
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
github.com/spf13/cobra v1.4.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
|
@ -31,6 +32,7 @@ require (
|
|||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
||||
github.com/mitchellh/mapstructure v1.4.3 // indirect
|
||||
github.com/pelletier/go-toml v1.9.4 // indirect
|
||||
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_model v0.2.0 // indirect
|
||||
github.com/prometheus/common v0.32.1 // indirect
|
||||
|
|
4
go.sum
4
go.sum
|
@ -221,6 +221,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
|
|||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
|
||||
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
|
||||
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 h1:q2e307iGHPdTGp0hoxKjt1H5pDo6utceo3dQVK3I5XQ=
|
||||
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5/go.mod h1:jvVRKCrJTQWu0XVbaOlby/2lO20uSCHEMzzplHXte1o=
|
||||
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
|
@ -251,6 +253,8 @@ github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0
|
|||
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0=
|
||||
github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM=
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
|
||||
|
|
|
@ -3,8 +3,8 @@ package msgbus
|
|||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
sync "github.com/sasha-s/go-deadlock"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
|
189
msgbus.go
189
msgbus.go
|
@ -10,10 +10,10 @@ import (
|
|||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/andybalholm/brotli"
|
||||
sync "github.com/sasha-s/go-deadlock"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
|
@ -54,7 +54,7 @@ type HandlerFunc func(msg *Message) error
|
|||
// Topic ...
|
||||
type Topic struct {
|
||||
Name string `json:"name"`
|
||||
Sequence uint64 `json:"seq"`
|
||||
Sequence int `json:"seq"`
|
||||
Created time.Time `json:"created"`
|
||||
}
|
||||
|
||||
|
@ -64,19 +64,32 @@ func (t *Topic) String() string {
|
|||
|
||||
// Message ...
|
||||
type Message struct {
|
||||
ID uint64 `json:"id"`
|
||||
ID int `json:"id"`
|
||||
Topic *Topic `json:"topic"`
|
||||
Payload []byte `json:"payload"`
|
||||
Created time.Time `json:"created"`
|
||||
}
|
||||
|
||||
// ListenerOptions ...
|
||||
type ListenerOptions struct {
|
||||
// SubscribeOption ...
|
||||
type SubscribeOption func(*SubscriberOptions)
|
||||
|
||||
// SubscriberOptions ...
|
||||
type SubscriberOptions struct {
|
||||
Index int
|
||||
}
|
||||
|
||||
// WithIndex sets the index to start subscribing from
|
||||
func WithIndex(index int) SubscribeOption {
|
||||
return func(o *SubscriberOptions) { o.Index = index }
|
||||
}
|
||||
|
||||
// SubscribersConfig ...
|
||||
type SubscriberConfig struct {
|
||||
BufferLength int
|
||||
}
|
||||
|
||||
// Listeners ...
|
||||
type Listeners struct {
|
||||
// Subscribers ...
|
||||
type Subscribers struct {
|
||||
sync.RWMutex
|
||||
|
||||
buflen int
|
||||
|
@ -85,19 +98,19 @@ type Listeners struct {
|
|||
chs map[string]chan Message
|
||||
}
|
||||
|
||||
// NewListeners ...
|
||||
func NewListeners(options *ListenerOptions) *Listeners {
|
||||
// NewSubscribers ...
|
||||
func NewSubscribers(config *SubscriberConfig) *Subscribers {
|
||||
var (
|
||||
bufferLength int
|
||||
)
|
||||
|
||||
if options != nil {
|
||||
bufferLength = options.BufferLength
|
||||
if config != nil {
|
||||
bufferLength = config.BufferLength
|
||||
} else {
|
||||
bufferLength = DefaultBufferLength
|
||||
}
|
||||
|
||||
return &Listeners{
|
||||
return &Subscribers{
|
||||
buflen: bufferLength,
|
||||
|
||||
ids: make(map[string]bool),
|
||||
|
@ -105,50 +118,57 @@ func NewListeners(options *ListenerOptions) *Listeners {
|
|||
}
|
||||
}
|
||||
|
||||
// Length ...
|
||||
func (ls *Listeners) Length() int {
|
||||
ls.RLock()
|
||||
defer ls.RUnlock()
|
||||
// Len ...
|
||||
func (subs *Subscribers) Len() int {
|
||||
subs.RLock()
|
||||
defer subs.RUnlock()
|
||||
|
||||
return len(ls.ids)
|
||||
return len(subs.ids)
|
||||
}
|
||||
|
||||
// Add ...
|
||||
func (ls *Listeners) Add(id string) chan Message {
|
||||
ls.Lock()
|
||||
defer ls.Unlock()
|
||||
// AddSubscriber ...
|
||||
func (subs *Subscribers) AddSubscriber(id string) chan Message {
|
||||
subs.Lock()
|
||||
defer subs.Unlock()
|
||||
|
||||
ls.ids[id] = true
|
||||
ls.chs[id] = make(chan Message, ls.buflen)
|
||||
return ls.chs[id]
|
||||
if subs.buflen <= 0 {
|
||||
log.Fatal("subscriber buflen <= 0")
|
||||
}
|
||||
|
||||
ch := make(chan Message, subs.buflen)
|
||||
|
||||
subs.ids[id] = true
|
||||
subs.chs[id] = ch
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// Remove ...
|
||||
func (ls *Listeners) Remove(id string) {
|
||||
ls.Lock()
|
||||
defer ls.Unlock()
|
||||
// RemoveSubscriber ...
|
||||
func (subs *Subscribers) RemoveSubscriber(id string) {
|
||||
subs.Lock()
|
||||
defer subs.Unlock()
|
||||
|
||||
delete(ls.ids, id)
|
||||
delete(subs.ids, id)
|
||||
|
||||
close(ls.chs[id])
|
||||
delete(ls.chs, id)
|
||||
close(subs.chs[id])
|
||||
delete(subs.chs, id)
|
||||
}
|
||||
|
||||
// Exists ...
|
||||
func (ls *Listeners) Exists(id string) bool {
|
||||
ls.RLock()
|
||||
defer ls.RUnlock()
|
||||
// HasSubscriber ...
|
||||
func (subs *Subscribers) HasSubscriber(id string) bool {
|
||||
subs.RLock()
|
||||
defer subs.RUnlock()
|
||||
|
||||
_, ok := ls.ids[id]
|
||||
_, ok := subs.ids[id]
|
||||
return ok
|
||||
}
|
||||
|
||||
// Get ...
|
||||
func (ls *Listeners) Get(id string) (chan Message, bool) {
|
||||
ls.RLock()
|
||||
defer ls.RUnlock()
|
||||
// GetSubscriber ...
|
||||
func (subs *Subscribers) GetSubscriber(id string) (chan Message, bool) {
|
||||
subs.RLock()
|
||||
defer subs.RUnlock()
|
||||
|
||||
ch, ok := ls.chs[id]
|
||||
ch, ok := subs.chs[id]
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
|
@ -156,12 +176,12 @@ func (ls *Listeners) Get(id string) (chan Message, bool) {
|
|||
}
|
||||
|
||||
// NotifyAll ...
|
||||
func (ls *Listeners) NotifyAll(message Message) int {
|
||||
ls.RLock()
|
||||
defer ls.RUnlock()
|
||||
func (subs *Subscribers) NotifyAll(message Message) int {
|
||||
subs.RLock()
|
||||
defer subs.RUnlock()
|
||||
|
||||
i := 0
|
||||
for id, ch := range ls.chs {
|
||||
for id, ch := range subs.chs {
|
||||
select {
|
||||
case ch <- message:
|
||||
i++
|
||||
|
@ -193,9 +213,9 @@ type MessageBus struct {
|
|||
maxQueueSize int
|
||||
maxPayloadSize int
|
||||
|
||||
topics map[string]*Topic
|
||||
queues map[*Topic]*Queue
|
||||
listeners map[*Topic]*Listeners
|
||||
topics map[string]*Topic
|
||||
queues map[*Topic]*Queue
|
||||
subscribers map[*Topic]*Subscribers
|
||||
}
|
||||
|
||||
// New ...
|
||||
|
@ -312,9 +332,9 @@ func New(options *Options) *MessageBus {
|
|||
maxQueueSize: maxQueueSize,
|
||||
maxPayloadSize: maxPayloadSize,
|
||||
|
||||
topics: make(map[string]*Topic),
|
||||
queues: make(map[*Topic]*Queue),
|
||||
listeners: make(map[*Topic]*Listeners),
|
||||
topics: make(map[string]*Topic),
|
||||
queues: make(map[*Topic]*Queue),
|
||||
subscribers: make(map[*Topic]*Subscribers),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -406,20 +426,20 @@ func (mb *MessageBus) Get(t *Topic) (Message, bool) {
|
|||
|
||||
// publish ...
|
||||
func (mb *MessageBus) publish(message Message) {
|
||||
ls, ok := mb.listeners[message.Topic]
|
||||
ls, ok := mb.subscribers[message.Topic]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
n := ls.NotifyAll(message)
|
||||
if n != ls.Length() && mb.metrics != nil {
|
||||
log.Warnf("%d/%d subscribers notified", n, ls.Length())
|
||||
mb.metrics.Counter("bus", "dropped").Add(float64(ls.Length() - n))
|
||||
if n != ls.Len() && mb.metrics != nil {
|
||||
log.Warnf("%d/%d subscribers notified", n, ls.Len())
|
||||
mb.metrics.Counter("bus", "dropped").Add(float64(ls.Len() - n))
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe ...
|
||||
func (mb *MessageBus) Subscribe(id, topic string) chan Message {
|
||||
func (mb *MessageBus) Subscribe(id, topic string, opts ...SubscribeOption) chan Message {
|
||||
mb.Lock()
|
||||
defer mb.Unlock()
|
||||
|
||||
|
@ -429,15 +449,16 @@ func (mb *MessageBus) Subscribe(id, topic string) chan Message {
|
|||
mb.topics[topic] = t
|
||||
}
|
||||
|
||||
ls, ok := mb.listeners[t]
|
||||
subs, ok := mb.subscribers[t]
|
||||
if !ok {
|
||||
ls = NewListeners(&ListenerOptions{BufferLength: mb.bufferLength})
|
||||
mb.listeners[t] = ls
|
||||
subs = NewSubscribers(&SubscriberConfig{BufferLength: mb.bufferLength})
|
||||
mb.subscribers[t] = subs
|
||||
}
|
||||
|
||||
if ls.Exists(id) {
|
||||
if subs.HasSubscriber(id) {
|
||||
// Already verified the listener exists
|
||||
ch, _ := ls.Get(id)
|
||||
log.Debugf("already have subscriber %s", id)
|
||||
ch, _ := subs.GetSubscriber(id)
|
||||
return ch
|
||||
}
|
||||
|
||||
|
@ -445,7 +466,34 @@ func (mb *MessageBus) Subscribe(id, topic string) chan Message {
|
|||
mb.metrics.Gauge("bus", "subscribers").Inc()
|
||||
}
|
||||
|
||||
return ls.Add(id)
|
||||
o := &SubscriberOptions{}
|
||||
for _, opt := range opts {
|
||||
opt(o)
|
||||
}
|
||||
|
||||
ch := subs.AddSubscriber(id)
|
||||
q, ok := mb.queues[t]
|
||||
if !ok {
|
||||
log.Debug("nothing in queue, returning ch")
|
||||
return ch
|
||||
}
|
||||
|
||||
if o.Index >= 0 && o.Index <= q.Len() {
|
||||
var n int
|
||||
log.Debugf("subscriber wants to start from %d", o.Index)
|
||||
q.ForEach(func(item interface{}) error {
|
||||
msg := item.(Message)
|
||||
log.Debugf("found #%v", msg)
|
||||
if msg.ID >= o.Index {
|
||||
ch <- msg
|
||||
n++
|
||||
}
|
||||
return nil
|
||||
})
|
||||
log.Debugf("published %d messages", n)
|
||||
}
|
||||
|
||||
return ch
|
||||
}
|
||||
|
||||
// Unsubscribe ...
|
||||
|
@ -458,14 +506,14 @@ func (mb *MessageBus) Unsubscribe(id, topic string) {
|
|||
return
|
||||
}
|
||||
|
||||
ls, ok := mb.listeners[t]
|
||||
ls, ok := mb.subscribers[t]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if ls.Exists(id) {
|
||||
if ls.HasSubscriber(id) {
|
||||
// Already verified the listener exists
|
||||
ls.Remove(id)
|
||||
ls.RemoveSubscriber(id)
|
||||
|
||||
if mb.metrics != nil {
|
||||
mb.metrics.Gauge("bus", "subscribers").Dec()
|
||||
|
@ -560,7 +608,9 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
NewClient(conn, t, mb).Start()
|
||||
i := SafeParseInt(r.URL.Query().Get("index"), -1)
|
||||
|
||||
NewClient(conn, t, i, mb).Start()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -591,6 +641,7 @@ func (mb *MessageBus) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
type Client struct {
|
||||
conn *websocket.Conn
|
||||
topic *Topic
|
||||
index int
|
||||
bus *MessageBus
|
||||
|
||||
id string
|
||||
|
@ -598,8 +649,8 @@ type Client struct {
|
|||
}
|
||||
|
||||
// NewClient ...
|
||||
func NewClient(conn *websocket.Conn, topic *Topic, bus *MessageBus) *Client {
|
||||
return &Client{conn: conn, topic: topic, bus: bus}
|
||||
func NewClient(conn *websocket.Conn, topic *Topic, index int, bus *MessageBus) *Client {
|
||||
return &Client{conn: conn, topic: topic, index: index, bus: bus}
|
||||
}
|
||||
|
||||
func (c *Client) readPump() {
|
||||
|
@ -684,7 +735,7 @@ func (c *Client) writePump() {
|
|||
// Start ...
|
||||
func (c *Client) Start() {
|
||||
c.id = c.conn.RemoteAddr().String()
|
||||
c.ch = c.bus.Subscribe(c.id, c.topic.Name)
|
||||
c.ch = c.bus.Subscribe(c.id, c.topic.Name, WithIndex(c.index))
|
||||
|
||||
c.conn.SetCloseHandler(func(code int, text string) error {
|
||||
c.bus.Unsubscribe(c.id, c.topic.Name)
|
||||
|
|
|
@ -4,16 +4,23 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"nhooyr.io/websocket"
|
||||
"nhooyr.io/websocket/wsjson"
|
||||
)
|
||||
|
||||
var (
|
||||
debug = flag.Bool("d", false, "enable debug logging")
|
||||
)
|
||||
|
||||
func TestMessageBusLen(t *testing.T) {
|
||||
mb := New(nil)
|
||||
assert.Equal(t, mb.Len(), 0)
|
||||
|
@ -24,7 +31,7 @@ func TestMessage(t *testing.T) {
|
|||
assert.Equal(t, mb.Len(), 0)
|
||||
|
||||
topic := mb.NewTopic("foo")
|
||||
expected := Message{Topic: topic, Payload: []byte("bar")}
|
||||
expected := mb.NewMessage(topic, []byte("bar"))
|
||||
mb.Put(expected)
|
||||
|
||||
actual, ok := mb.Get(topic)
|
||||
|
@ -32,6 +39,28 @@ func TestMessage(t *testing.T) {
|
|||
assert.Equal(t, actual, expected)
|
||||
}
|
||||
|
||||
func TestMessageIds(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
||||
mb := New(nil)
|
||||
assert.Equal(0, mb.Len())
|
||||
|
||||
topic := mb.NewTopic("foo")
|
||||
expected := mb.NewMessage(topic, []byte("bar"))
|
||||
mb.Put(expected)
|
||||
|
||||
actual, ok := mb.Get(topic)
|
||||
require.True(ok)
|
||||
assert.Equal(expected, actual)
|
||||
|
||||
mb.Put(mb.NewMessage(topic, []byte("bar")))
|
||||
msg, ok := mb.Get(topic)
|
||||
require.True(ok)
|
||||
assert.Equal(msg.ID, 1)
|
||||
|
||||
}
|
||||
|
||||
func TestMessageGetEmpty(t *testing.T) {
|
||||
mb := New(nil)
|
||||
assert.Equal(t, mb.Len(), 0)
|
||||
|
@ -45,7 +74,7 @@ func TestMessageGetEmpty(t *testing.T) {
|
|||
func TestMessageBusPutGet(t *testing.T) {
|
||||
mb := New(nil)
|
||||
topic := mb.NewTopic("foo")
|
||||
expected := Message{Topic: topic, Payload: []byte("foo")}
|
||||
expected := mb.NewMessage(topic, []byte("foo"))
|
||||
mb.Put(expected)
|
||||
|
||||
actual, ok := mb.Get(topic)
|
||||
|
@ -59,13 +88,39 @@ func TestMessageBusSubscribe(t *testing.T) {
|
|||
msgs := mb.Subscribe("id1", "foo")
|
||||
|
||||
topic := mb.NewTopic("foo")
|
||||
expected := Message{Topic: topic, Payload: []byte("foo")}
|
||||
expected := mb.NewMessage(topic, []byte("foo"))
|
||||
mb.Put(expected)
|
||||
|
||||
actual := <-msgs
|
||||
assert.Equal(t, actual, expected)
|
||||
}
|
||||
|
||||
func TestMessageBusSubscribeWithIndex(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
mb := New(nil)
|
||||
|
||||
msgs := mb.Subscribe("id1", "foo")
|
||||
|
||||
topic := mb.NewTopic("foo")
|
||||
expected := mb.NewMessage(topic, []byte("foo"))
|
||||
mb.Put(expected)
|
||||
|
||||
actual := <-msgs
|
||||
assert.Equal(expected, actual)
|
||||
assert.Equal(0, actual.ID)
|
||||
|
||||
mb.Unsubscribe("id1", "foo")
|
||||
|
||||
mb.Put(mb.NewMessage(topic, []byte("bar"))) // ID == 1
|
||||
mb.Put(mb.NewMessage(topic, []byte("baz"))) // ID == 2
|
||||
|
||||
msgs = mb.Subscribe("id1", "foo", WithIndex(1))
|
||||
|
||||
assert.Equal([]byte("bar"), (<-msgs).Payload)
|
||||
assert.Equal([]byte("baz"), (<-msgs).Payload)
|
||||
}
|
||||
|
||||
func TestServeHTTPGETIndexEmpty(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
|
@ -83,8 +138,8 @@ func TestServeHTTPGETTopics(t *testing.T) {
|
|||
|
||||
mb := New(nil)
|
||||
|
||||
mb.Put(Message{Topic: mb.NewTopic("foo"), Payload: []byte("foo")})
|
||||
mb.Put(Message{Topic: mb.NewTopic("hello"), Payload: []byte("hello world")})
|
||||
mb.Put(mb.NewMessage(mb.NewTopic("foo"), []byte("foo")))
|
||||
mb.Put(mb.NewMessage(mb.NewTopic("hello"), []byte("hello world")))
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
r, _ := http.NewRequest("GET", "/", nil)
|
||||
|
@ -178,11 +233,11 @@ func TestServeHTTPSubscriber(t *testing.T) {
|
|||
s := httptest.NewServer(mb)
|
||||
defer s.Close()
|
||||
|
||||
msgs := make(chan *Message)
|
||||
msgs := make(chan Message, 10)
|
||||
ready := make(chan bool, 1)
|
||||
|
||||
consumer := func() {
|
||||
var msg *Message
|
||||
var msg Message
|
||||
|
||||
// u := fmt.Sprintf("ws%s/hello", strings.TrimPrefix(s.URL, "http"))
|
||||
ws, _, err := websocket.Dial(context.Background(), s.URL+"/hello", nil)
|
||||
|
@ -221,11 +276,11 @@ func TestServeHTTPSubscriberReconnect(t *testing.T) {
|
|||
|
||||
s := httptest.NewServer(mb)
|
||||
|
||||
msgs := make(chan *Message)
|
||||
msgs := make(chan Message, 10)
|
||||
ready := make(chan bool, 1)
|
||||
|
||||
consumer := func() {
|
||||
var msg *Message
|
||||
var msg Message
|
||||
|
||||
ws, _, err := websocket.Dial(context.Background(), s.URL+"/hello", nil)
|
||||
require.NoError(err)
|
||||
|
@ -273,7 +328,7 @@ func TestMsgBusMetrics(t *testing.T) {
|
|||
func BenchmarkMessageBusPut(b *testing.B) {
|
||||
mb := New(nil)
|
||||
topic := mb.NewTopic("foo")
|
||||
msg := Message{Topic: topic, Payload: []byte("foo")}
|
||||
msg := mb.NewMessage(topic, []byte("foo"))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
mb.Put(msg)
|
||||
|
@ -283,7 +338,7 @@ func BenchmarkMessageBusPut(b *testing.B) {
|
|||
func BenchmarkMessageBusGet(b *testing.B) {
|
||||
mb := New(nil)
|
||||
topic := mb.NewTopic("foo")
|
||||
msg := Message{Topic: topic, Payload: []byte("foo")}
|
||||
msg := mb.NewMessage(topic, []byte("foo"))
|
||||
for i := 0; i < b.N; i++ {
|
||||
mb.Put(msg)
|
||||
}
|
||||
|
@ -305,10 +360,24 @@ func BenchmarkMessageBusGetEmpty(b *testing.B) {
|
|||
func BenchmarkMessageBusPutGet(b *testing.B) {
|
||||
mb := New(nil)
|
||||
topic := mb.NewTopic("foo")
|
||||
msg := Message{Topic: topic, Payload: []byte("foo")}
|
||||
msg := mb.NewMessage(topic, []byte("foo"))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
mb.Put(msg)
|
||||
mb.Get(topic)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
flag.Parse()
|
||||
|
||||
if *debug {
|
||||
log.SetLevel(log.DebugLevel)
|
||||
} else {
|
||||
log.SetLevel(log.WarnLevel)
|
||||
}
|
||||
|
||||
result := m.Run()
|
||||
|
||||
os.Exit(result)
|
||||
}
|
||||
|
|
20
queue.go
20
queue.go
|
@ -1,7 +1,8 @@
|
|||
package msgbus
|
||||
|
||||
import (
|
||||
"sync"
|
||||
sync "github.com/sasha-s/go-deadlock"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// minCapacity is the smallest capacity that queue may have.
|
||||
|
@ -61,6 +62,23 @@ func (q *Queue) Empty() bool {
|
|||
return q.count == 0
|
||||
}
|
||||
|
||||
// ForEach applys the function `f` over each item in the queue for read-only
|
||||
// access into the queue in O(n) time for indexining into the queue.
|
||||
func (q *Queue) ForEach(f func(elem interface{}) error) error {
|
||||
q.Lock()
|
||||
defer q.Unlock()
|
||||
|
||||
log.Debugf("buf: #%v", q.buf)
|
||||
log.Debugf("q.tail: %d", q.tail)
|
||||
log.Debugf("q.head: %d", q.head)
|
||||
for i := q.head; i < q.tail; i++ {
|
||||
if err := f(q.buf[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Full returns true if the queue is full false otherwise
|
||||
func (q *Queue) Full() bool {
|
||||
q.RLock()
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package msgbus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEmpty(t *testing.T) {
|
||||
|
@ -27,6 +29,31 @@ func TestSimple(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestForEach(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
||||
q := NewQueue()
|
||||
|
||||
ys := []int{0, 1, 2, 3}
|
||||
for _, y := range ys {
|
||||
q.Push(y)
|
||||
}
|
||||
assert.Equal(q.Len(), 4)
|
||||
|
||||
var xs []int
|
||||
err := q.ForEach(func(e interface{}) error {
|
||||
i, ok := e.(int)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected type %T", e)
|
||||
}
|
||||
xs = append(xs, i)
|
||||
return nil
|
||||
})
|
||||
require.NoError(err)
|
||||
assert.Equal(ys, xs)
|
||||
}
|
||||
|
||||
func TestMaxLen(t *testing.T) {
|
||||
q := Queue{maxlen: minCapacity}
|
||||
assert.Equal(t, q.MaxLen(), minCapacity)
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
package msgbus
|
||||
|
||||
import "strconv"
|
||||
|
||||
// SafeParseInt ...
|
||||
func SafeParseInt(s string, d int) int {
|
||||
n, e := strconv.Atoi(s)
|
||||
if e != nil {
|
||||
return d
|
||||
}
|
||||
return n
|
||||
}
|
Notiek ielāde…
Atsaukties uz šo jaunā problēmā